diff --git a/firmware_hunter_pro_v4.py b/firmware_hunter_pro_v4.py deleted file mode 100644 index ac0d781..0000000 --- a/firmware_hunter_pro_v4.py +++ /dev/null @@ -1,1387 +0,0 @@ -#!/usr/bin/env python3 -""" -Firmware Hunter Pro v4.0 -A safer offline firmware triage framework for extracted router / IoT firmware. - -What it does: -- Scans extracted firmware directories safely without executing firmware binaries. -- Optionally extracts a firmware image with binwalk if binwalk is installed. -- Finds credentials, keys, certs, IPs, domains, URLs, MACs, JWTs, API keys, hashes, CVEs. -- Maps web UI files, CGI handlers, forms, scripts, endpoints, and admin-looking routes. -- Detects ELF binaries, architecture hints, BusyBox, Linux kernel, OpenSSL, Dropbear, dnsmasq, uClibc/musl/glibc. -- Scores findings with severity and confidence. -- Produces TXT, JSON, HTML, Markdown, CSV, and separate evidence files. -- Supports simple external plugins from a plugins directory. - -Safe by default: -- Does not execute firmware binaries. -- Only reads files and optionally runs host tools like strings, file, binwalk, yara. - -Usage: - python3 firmware_hunter_pro.py /path/to/squashfs-root - python3 firmware_hunter_pro.py firmware.bin --extract - python3 firmware_hunter_pro.py /path/to/rootfs --quick - python3 firmware_hunter_pro.py /path/to/rootfs --yara rules.yar - python3 firmware_hunter_pro.py /path/to/rootfs --plugins plugins/ -""" - -import os -import re -import csv -import sys -import json -import math -import time -import html -import shutil -import hashlib -import argparse -import tempfile -import subprocess -import importlib.util -from pathlib import Path -from datetime import datetime -from collections import Counter, defaultdict -from concurrent.futures import ThreadPoolExecutor, as_completed -from threading import Lock - -VERSION = "4.0" -REPORT = defaultdict(list) -REPORT_LOCK = Lock() - -# ============================================================================= -# CONFIG -# ============================================================================= - -TEXT_EXTENSIONS = { - ".txt", ".log", ".conf", ".cfg", ".ini", ".json", ".xml", ".html", ".htm", - ".js", ".php", ".asp", ".cgi", ".lua", ".sh", ".service", ".default", - ".profile", ".passwd", ".shadow", ".pem", ".key", ".crt", ".pub", ".yaml", - ".yml", ".env", ".properties", ".rc", ".rules" -} - -INTERESTING_NAMES = { - "passwd", "shadow", "group", "hosts", "resolv.conf", "inittab", "fstab", - "rcS", "rc.local", "profile", "authorized_keys", "known_hosts", "motd", - "issue", "services", "inetd", "inetd.conf", "udhcpd.conf", "dnsmasq.conf", - "dropbear", "telnetd", "lighttpd.conf", "boa.conf", "httpd.conf", - "mini_httpd.conf", "uhttpd.conf", "nginx.conf", "config.xml", "nvram", - "default.cfg", "wpa_supplicant.conf", "os-release", "version", "release", - "syslog.conf", "crontab" -} - -EXECUTABLE_NAMES = { - "busybox", "telnetd", "dropbear", "sshd", "httpd", "boa", "lighttpd", - "mini_httpd", "uhttpd", "nginx", "nc", "netcat", "wget", "curl", "tftp", - "ftpget", "ftpput", "iptables", "ip6tables", "dnsmasq", "udhcpd", "pppd", - "openvpn", "openssl", "sqlite3", "ash", "sh" -} - -SUSPICIOUS_KEYWORDS = { - "telnetd", "dropbear", "busybox telnet", "nc -l", "netcat", "/bin/sh", - "/bin/ash", "reverse shell", "backdoor", "wget http", "curl http", "tftp", - "ftpget", "ftpput", "chmod 777", "0.0.0.0", "admin:admin", "root:root", - "password", "passwd", "shadow", "debug", "test", "factory", "developer", - "enable telnet", "remote shell", "hardcoded", "debug shell", "diagnostic", - "support account", "superuser", "hidden", "maintenance" -} - -MALWARE_IOC_KEYWORDS = { - "mirai": ["mirai", "busybox MIRAI", "/bin/busybox", "report.%s", "scanListen"], - "gafgyt/bashlite": ["gafgyt", "bashlite", "gayfgt", "loligang", "telnet scanner"], - "mozi": ["mozi", "Mozi.m", "dht.transmissionbt.com", "router.bittorrent.com"], - "xorddos": ["xorddos", "x0r", "/tmp/.x", "BB2FA36AAA9541F0"], - "miner": ["stratum+tcp", "xmrig", "minerd", "cryptonight", "monero"], - "generic_bot": ["CNC", "C2", "botnet", "udp flood", "syn flood", "http flood"] -} - -REGEX_PATTERNS = { - "ipv4": r"\b(?:\d{1,3}\.){3}\d{1,3}\b", - "mac_address": r"\b(?:[0-9a-fA-F]{2}[:-]){5}[0-9a-fA-F]{2}\b", - "url": r"https?://[^\s'\"<>]{4,}", - "email": r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", - "domain": r"\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|cn|ru|info|biz|us|uk|co|dev|cloud|local|lan)\b", - "md5": r"\b[a-fA-F0-9]{32}\b", - "sha1": r"\b[a-fA-F0-9]{40}\b", - "sha256": r"\b[a-fA-F0-9]{64}\b", - "jwt": r"eyJ[a-zA-Z0-9_\-]+\.[a-zA-Z0-9_\-]+\.[a-zA-Z0-9_\-]+", - "api_key": r"(?i)(api[_-]?key|apikey|access[_-]?key|secret[_-]?key|client_secret)\s*[:=]\s*['\"]?([a-zA-Z0-9_\-]{12,})", - "wifi_psk": r"(?i)(passphrase|psk|wpa_pass|wifi_pass|wifi_password|wireless_key|wpakey)\s*[:=]\s*['\"]?([^\s'\";&]{8,63})", - "possible_password": r"(?i)(password|passwd|pwd|pass|admin_pass|root_pass|web_pass|wifi_pass|secret|token)\s*[:=]\s*['\"]?([^\s'\";&]{4,})", - "possible_username": r"(?i)(username|user|login|admin_user|root_user)\s*[:=]\s*['\"]?([^\s'\";&]{3,})", - "mqtt_credential": r"(?i)(mqtt_(?:user|username|pass|password)|mqttUser|mqttPass)\s*[:=]\s*['\"]?([^\s'\";&]{3,})", - "basic_auth": r"(?i)authorization:\s*basic\s+([a-zA-Z0-9+/=]{8,})", -} - -PRIVATE_KEY_MARKERS = { - "-----BEGIN RSA PRIVATE KEY-----", - "-----BEGIN DSA PRIVATE KEY-----", - "-----BEGIN EC PRIVATE KEY-----", - "-----BEGIN OPENSSH PRIVATE KEY-----", - "-----BEGIN PRIVATE KEY-----" -} - -CERT_MARKERS = {"-----BEGIN CERTIFICATE-----"} - -COMPONENT_PATTERNS = { - "busybox": [ - r"BusyBox v([\w.\-]+)", - r"busybox\s+v([\w.\-]+)" - ], - "linux_kernel": [ - r"Linux version ([\w.\-]+)", - r"kernel version[:= ]+([\w.\-]+)" - ], - "openssl": [ - r"OpenSSL\s+([0-9][\w.\-]+[a-z]?)", - r"openssl-([0-9][\w.\-]+)" - ], - "dropbear": [ - r"Dropbear sshd v?([0-9][\w.\-]+)", - r"dropbear[_ -]?([0-9]{4}\.[0-9]{2})" - ], - "dnsmasq": [ - r"dnsmasq-?([0-9][\w.\-]+)", - r"dnsmasq version ([0-9][\w.\-]+)" - ], - "uClibc": [ - r"uClibc-?([0-9][\w.\-]+)" - ], - "musl": [ - r"musl-?([0-9][\w.\-]+)" - ], - "glibc": [ - r"GNU C Library.*?release version ([0-9][\w.\-]+)", - r"GLIBC_([0-9.]+)" - ], - "lighttpd": [ - r"lighttpd/([0-9][\w.\-]+)" - ], - "boa": [ - r"Boa/([0-9][\w.\-]+)" - ], - "uhttpd": [ - r"uhttpd[-/ ]([0-9][\w.\-]+)" - ] -} - -CVE_HINTS = { - "busybox": { - "note": "BusyBox is frequently old in embedded firmware. Verify exact version against NVD/vendor advisories.", - "keywords": ["busybox", "ash", "udhcp", "telnetd"] - }, - "openssl": { - "note": "OpenSSL version detected. Check for old TLS/crypto CVEs and weak certificate/key usage.", - "keywords": ["openssl", "libssl", "libcrypto"] - }, - "dropbear": { - "note": "Dropbear SSH detected. Check version against Dropbear security advisories.", - "keywords": ["dropbear", "sshd"] - }, - "dnsmasq": { - "note": "dnsmasq detected. Check version for DNS/DHCP vulnerabilities.", - "keywords": ["dnsmasq"] - }, - "boa": { - "note": "Boa web server is commonly outdated in IoT firmware.", - "keywords": ["boa"] - } -} - -IMPORTANT_SECTIONS = [ - ("Credential Findings", "credential_findings", "credential_findings.txt"), - ("Possible Passwords", "possible_password", "possible_passwords.txt"), - ("Wi-Fi PSKs", "wifi_psk", "wifi_psks.txt"), - ("Possible Usernames", "possible_username", "possible_usernames.txt"), - ("API Keys / Secrets", "api_key", "api_keys.txt"), - ("MQTT Credentials", "mqtt_credential", "mqtt_credentials.txt"), - ("JWT Tokens", "jwt", "jwt_tokens.txt"), - ("Private Keys", "private_keys", "private_keys.txt"), - ("Certificates", "certificates", "certificates.txt"), - ("IP Addresses", "ipv4", "ip_addresses.txt"), - ("MAC Addresses", "mac_address", "mac_addresses.txt"), - ("URLs", "url", "urls.txt"), - ("Domains", "domain", "domains.txt"), - ("Emails", "email", "emails.txt"), - ("Component Versions", "components", "components.txt"), - ("Version Strings", "version_strings", "version_strings.txt"), - ("Firmware Identity", "firmware_identity", "firmware_identity.txt"), - ("Startup Scripts", "startup_scripts", "startup_scripts.txt"), - ("Interesting Files", "interesting_files", "interesting_files.txt"), - ("Interesting Binaries", "interesting_binaries", "interesting_binaries.txt"), - ("ELF Binaries", "elf_binaries", "elf_binaries.txt"), - ("Architecture Summary", "architecture_summary", "architecture_summary.txt"), - ("Web Files", "web_files", "web_files.txt"), - ("Web Routes", "web_routes", "web_routes.txt"), - ("Web Endpoints", "web_endpoints", "web_endpoints.txt"), - ("Cron Jobs", "cron_jobs", "cron_jobs.txt"), - ("Users / Groups", "users_groups", "users_groups.txt"), - ("SSH Related Files", "ssh_related", "ssh_related.txt"), - ("Suspicious Keywords", "suspicious_keywords", "suspicious_keywords.txt"), - ("Malware IOC Matches", "malware_iocs", "malware_iocs.txt"), - ("High Entropy Files", "high_entropy_files", "high_entropy_files.txt"), - ("Largest Files", "largest_files", "largest_files.txt"), - ("BusyBox Findings", "busybox", "busybox.txt"), - ("CVE References", "cve_references", "cve_references.txt"), - ("CVE Hints", "cve_hints", "cve_hints.txt"), - ("YARA Matches", "yara_matches", "yara_matches.txt"), - ("Plugin Findings", "plugin_findings", "plugin_findings.txt"), - ("Skipped Large Files", "skipped_large_files", "skipped_large_files.txt"), - ("Tool Warnings", "tool_warnings", "tool_warnings.txt"), - ("Scan Errors", "scan_errors", "scan_errors.txt"), -] - - -# ============================================================================= -# BASIC HELPERS -# ============================================================================= - -def add(category, data): - """Thread-safe de-duplicated report append.""" - with REPORT_LOCK: - if data not in REPORT[category]: - REPORT[category].append(data) - - -def safe_rel(path, root): - try: - return str(Path(path).resolve().relative_to(Path(root).resolve())) - except Exception: - return str(path) - - -def now_iso(): - return datetime.now().isoformat(timespec="seconds") - - -def check_dependencies(yara_requested=False, extract_requested=False): - dependencies = ["strings"] - if yara_requested: - dependencies.append("yara") - if extract_requested: - dependencies.append("binwalk") - - for tool in dependencies: - if shutil.which(tool) is None: - add("tool_warnings", { - "tool": tool, - "warning": f"'{tool}' was not found in PATH. Some features may be limited." - }) - print(f"[!] Warning: '{tool}' not found. Some features may be limited.") - - -def sha256_file(path): - try: - h = hashlib.sha256() - with open(path, "rb") as f: - for chunk in iter(lambda: f.read(1024 * 1024), b""): - h.update(chunk) - return h.hexdigest() - except Exception: - return None - - -def entropy_file(path, max_bytes=1024 * 1024): - try: - with open(path, "rb") as f: - data = f.read(max_bytes) - if not data: - return 0.0 - counts = Counter(data) - length = len(data) - entropy = -sum((count / length) * math.log2(count / length) for count in counts.values()) - return round(entropy, 4) - except Exception: - return None - - -def is_text_file(path): - if path.suffix.lower() in TEXT_EXTENSIONS: - return True - try: - with open(path, "rb") as f: - chunk = f.read(4096) - if not chunk: - return False - return chunk.count(b"\x00") / len(chunk) < 0.08 - except Exception: - return False - - -def read_text(path, max_chars=None): - try: - text = path.read_text(errors="ignore") - if max_chars: - return text[:max_chars] - return text - except Exception: - return "" - - -def is_elf(path): - try: - with open(path, "rb") as f: - return f.read(4) == b"\x7fELF" - except Exception: - return False - - -def elf_info(path): - try: - with open(path, "rb") as f: - header = f.read(20) - - if len(header) < 20: - return {} - - elf_class = header[4] - endian = header[5] - byte_order = "little" if endian == 1 else "big" if endian == 2 else "little" - machine = int.from_bytes(header[18:20], byte_order) - - arch_map = { - 3: "x86", - 8: "MIPS", - 20: "PowerPC", - 40: "ARM", - 62: "x86_64", - 183: "AArch64", - 243: "RISC-V" - } - - return { - "class": "32-bit" if elf_class == 1 else "64-bit" if elf_class == 2 else "unknown", - "endian": "little" if endian == 1 else "big" if endian == 2 else "unknown", - "machine": arch_map.get(machine, f"unknown-{machine}") - } - except Exception: - return {} - - -def run_strings(path, limit=1200): - try: - result = subprocess.run( - ["strings", str(path)], - capture_output=True, - text=True, - timeout=20, - errors="ignore" - ) - return result.stdout.splitlines()[:limit] - except Exception: - return [] - - -def run_file_cmd(path): - if shutil.which("file") is None: - return None - try: - result = subprocess.run( - ["file", "-b", str(path)], - capture_output=True, - text=True, - timeout=5, - errors="ignore" - ) - return result.stdout.strip() - except Exception: - return None - - -# ============================================================================= -# EXTRACTION -# ============================================================================= - -def extract_with_binwalk(firmware_image, output_base): - firmware_image = Path(firmware_image).resolve() - extract_dir = Path(output_base).resolve() / f"binwalk_extract_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" - extract_dir.mkdir(parents=True, exist_ok=True) - - if shutil.which("binwalk") is None: - raise RuntimeError("binwalk not found. Install it or scan an already extracted rootfs directory.") - - print(f"[+] Extracting with binwalk into: {extract_dir}") - try: - subprocess.run( - ["binwalk", "-Me", str(firmware_image), "--directory", str(extract_dir)], - check=False, - timeout=1800 - ) - except subprocess.TimeoutExpired: - add("tool_warnings", {"tool": "binwalk", "warning": "binwalk extraction timed out"}) - except Exception as e: - add("tool_warnings", {"tool": "binwalk", "warning": f"binwalk extraction failed: {e}"}) - - candidates = find_rootfs_candidates(extract_dir) - if candidates: - print("[+] Possible rootfs candidates:") - for idx, cand in enumerate(candidates[:10], 1): - print(f" {idx}. {cand}") - return candidates[0] - - print("[!] No obvious rootfs found. Scanning extraction directory instead.") - return extract_dir - - -def find_rootfs_candidates(base_dir): - base_dir = Path(base_dir) - candidates = [] - - for dirpath, dirnames, filenames in os.walk(base_dir): - d = Path(dirpath) - names = set(filenames) | set(dirnames) - score = 0 - - if "etc" in names: - score += 3 - if "bin" in names: - score += 2 - if "sbin" in names: - score += 2 - if "www" in names or "web" in names or "htdocs" in names: - score += 2 - if "passwd" in names or "shadow" in names: - score += 2 - if "init.d" in names: - score += 2 - - if score >= 5: - candidates.append((score, d)) - - candidates.sort(key=lambda x: x[0], reverse=True) - return [c[1] for c in candidates] - - -# ============================================================================= -# SCANNERS -# ============================================================================= - -def extract_version_strings(text): - patterns = [ - r"(?i)(version|build|release|fw_ver|firmware|software|sw_ver|hardware|hw_ver|model|vendor|product)\s*[:=]\s*['\"]?([\w.\-/ ]{3,80})", - r"(?i)(busybox\s+v\d{1,3}\.\d{1,3}(?:\.\d{1,5})?)", - r"(?i)(linux\s+version\s+[\w.\-]+)", - r"\bV(\d{1,3}\.\d{1,3}\.\d{1,5})\b", - r"\b(\d{4}-\d{2}-\d{2})\b", - ] - - versions = set() - - for pattern in patterns: - for match in re.findall(pattern, text): - if isinstance(match, tuple): - value = match[-1] - else: - value = match - value = " ".join(value.strip().split()) - if 3 <= len(value) <= 120: - versions.add(value) - - return sorted(versions)[:40] - - -def detect_components(text, rel_path): - for component, patterns in COMPONENT_PATTERNS.items(): - for pattern in patterns: - for match in re.findall(pattern, text, re.I): - version = match[-1] if isinstance(match, tuple) else match - version = str(version).strip() - if version: - add("components", { - "component": component, - "version": version, - "file": rel_path - }) - if component in CVE_HINTS: - add("cve_hints", { - "component": component, - "version": version, - "file": rel_path, - "note": CVE_HINTS[component]["note"] - }) - - -def detect_malware_iocs(text, rel_path): - low = text.lower() - for family, indicators in MALWARE_IOC_KEYWORDS.items(): - hits = [] - for indicator in indicators: - if indicator.lower() in low: - hits.append(indicator) - if hits: - add("malware_iocs", { - "family_or_category": family, - "file": rel_path, - "indicators": sorted(set(hits)), - "confidence": "medium" if len(hits) == 1 else "high" - }) - - -def scan_text_content(path, content, rel_path): - for name, pattern in REGEX_PATTERNS.items(): - for match in re.findall(pattern, content): - if isinstance(match, tuple): - finding = {"file": rel_path, "key": match[0], "value": match[1]} - value_for_validation = match[1] - else: - finding = {"file": rel_path, "value": match} - value_for_validation = match - - if name == "ipv4": - parts = value_for_validation.split(".") - if not all(p.isdigit() and 0 <= int(p) <= 255 for p in parts): - continue - - add(name, finding) - - if name in {"possible_password", "wifi_psk", "api_key", "mqtt_credential"}: - add_credential_finding(name, finding) - - for marker in PRIVATE_KEY_MARKERS: - if marker in content: - add("private_keys", {"file": rel_path, "marker": marker}) - add("credential_findings", { - "file": rel_path, - "type": "private_key", - "value": marker, - "confidence": "high", - "severity": "critical", - "reason": "Private key material marker found" - }) - - for marker in CERT_MARKERS: - if marker in content: - add("certificates", {"file": rel_path, "marker": marker}) - - lowered = content.lower() - for keyword in SUSPICIOUS_KEYWORDS: - if keyword.lower() in lowered: - add("suspicious_keywords", {"file": rel_path, "keyword": keyword}) - - versions = extract_version_strings(content) - if versions: - add("version_strings", {"file": rel_path, "versions": versions}) - - detect_components(content, rel_path) - detect_malware_iocs(content, rel_path) - - -def add_credential_finding(kind, finding): - value = str(finding.get("value", "")) - key = str(finding.get("key", "")) - file = finding.get("file", "") - - confidence = "medium" - severity = "medium" - reasons = [] - - if kind in {"api_key", "wifi_psk"}: - confidence = "high" - severity = "high" - reasons.append(f"{kind} pattern matched") - - if key.lower() in {"password", "passwd", "pwd", "admin_pass", "root_pass", "secret", "token"}: - confidence = "high" - severity = "high" - reasons.append("credential-like key name") - - if value.lower() in {"admin", "root", "password", "123456", "12345678", "admin123", "root123"}: - confidence = "high" - severity = "high" - reasons.append("common/default credential value") - - if any(x in file.lower() for x in ["shadow", "passwd", "default", "config", "nvram", "wpa"]): - reasons.append("sensitive config path") - if confidence == "medium": - confidence = "high" - - add("credential_findings", { - "file": file, - "type": kind, - "key": key, - "value": value, - "confidence": confidence, - "severity": severity, - "reason": "; ".join(reasons) if reasons else "credential-like pattern matched" - }) - - -def classify_path(path, root, size, rel_path): - lower = str(path).lower() - name = path.name.lower() - - info = {"file": rel_path, "sha256": sha256_file(path), "size": size} - - if name in INTERESTING_NAMES: - add("interesting_files", info) - - if any(p in lower for p in ["/etc/init.d", "/etc/rc", "/etc/inittab", "/etc/services", "/lib/systemd"]): - add("startup_scripts", info) - - if any(p in lower for p in ["/www", "/web", "/htdocs", "/cgi-bin", "/var/www"]): - add("web_files", info) - - if any(x in name for x in ["config", ".conf", ".cfg", ".ini", ".json", ".xml", ".yaml", ".yml"]): - add("config_files", info) - - if "cron" in lower or "crontab" in lower: - add("cron_jobs", info) - - if name in {"passwd", "shadow", "group"}: - add("users_groups", info) - - if any(x in lower for x in ["ssh", "dropbear", "authorized_keys", "host_key"]): - add("ssh_related", info) - - if name in EXECUTABLE_NAMES: - add("interesting_binaries", {**info, "reason": "Common embedded Linux service or utility"}) - - -def scan_busybox(path, text, rel_path): - if "busybox" not in str(path).lower() and "busybox" not in text.lower(): - return - - banner = re.search(r"BusyBox v[\w.\-]+", text) - applet_hits = [] - applet_keywords = ["telnet", "wget", "tftp", "httpd", "ash", "sh", "nc", "ftpget", "ftpput"] - - for line in text.splitlines(): - low = line.lower() - if any(x in low for x in applet_keywords) and len(line.strip()) < 250: - applet_hits.append(line.strip()) - - risky_applets = [] - for app in applet_keywords: - if app in text.lower(): - risky_applets.append(app) - - add("busybox", { - "file": rel_path, - "banner": banner.group(0) if banner else None, - "risky_applet_hints": sorted(set(risky_applets)), - "possible_applets_or_strings": sorted(set(applet_hits))[:100] - }) - - -def scan_web_endpoints(path, content, rel_path): - lower_path = str(path).lower() - - if not any(x in lower_path for x in ["/www", "/web", "/htdocs", "/cgi-bin", ".html", ".js", ".php", ".cgi", ".asp"]): - return - - endpoints = re.findall(r"[\"'](/[^\"'\s<>]{2,})[\"']", content) - forms = re.findall(r"(?i)]+action=[\"']?([^\"'>\s]+)", content) - scripts = re.findall(r"(?i)]+src=[\"']?([^\"'>\s]+)", content) - ajax = re.findall(r"(?i)(?:url|href|src)\s*[:=]\s*[\"']([^\"']+)[\"']", content) - cgi = re.findall(r"[\w./-]+\.cgi(?:\?[^\"'\s<>]*)?", content) - - admin_hits = [] - for route in set(endpoints + forms + scripts + ajax + cgi): - if any(k in route.lower() for k in ["admin", "login", "password", "upgrade", "firmware", "reboot", "debug", "shell", "system", "config"]): - admin_hits.append(route) - - if endpoints or forms or scripts or ajax or cgi: - data = { - "file": rel_path, - "endpoints": sorted(set(endpoints))[:150], - "forms": sorted(set(forms))[:80], - "scripts": sorted(set(scripts))[:80], - "ajax_or_refs": sorted(set(ajax))[:120], - "cgi_refs": sorted(set(cgi))[:120], - "admin_like_routes": sorted(set(admin_hits))[:120] - } - add("web_endpoints", data) - - for route in sorted(set(endpoints + forms + scripts + ajax + cgi + admin_hits)): - if len(route) < 300: - add("web_routes", { - "route": route, - "file": rel_path, - "admin_like": route in admin_hits - }) - - -def scan_possible_cves(content, rel_path): - for cve in re.findall(r"CVE-\d{4}-\d{4,7}", content, re.I): - add("cve_references", {"file": rel_path, "cve": cve.upper()}) - - -def yara_scan_file(path, rel_path, yara_rule_path): - if not yara_rule_path: - return - - try: - result = subprocess.run( - ["yara", "-r", str(yara_rule_path), str(path)], - capture_output=True, - text=True, - timeout=25 - ) - - if result.stdout.strip(): - add("yara_matches", { - "file": rel_path, - "matches": result.stdout.strip().splitlines() - }) - - except FileNotFoundError: - add("tool_warnings", {"tool": "yara", "warning": "YARA is not installed or not in PATH"}) - except Exception as e: - add("scan_errors", {"file": rel_path, "error": f"YARA scan failed: {e}"}) - - -def scan_file(path, root, yara_rules=None, quick=False): - try: - if not path.is_file(): - return - - size = path.stat().st_size - rel_path = safe_rel(path, root) - - classify_path(path, root, size, rel_path) - - if quick and size > 10_000_000: - add("skipped_large_files", { - "file": rel_path, - "size": size, - "reason": "Skipped because --quick mode is enabled" - }) - return - - ent = entropy_file(path) - if ent is not None and ent >= 7.5 and size > 1024: - add("high_entropy_files", { - "file": rel_path, - "entropy": ent, - "size": size, - "sha256": sha256_file(path), - "note": "May indicate compression, encryption, packed data, or a binary blob" - }) - - if is_elf(path): - info = elf_info(path) - file_desc = run_file_cmd(path) - add("elf_binaries", { - "file": rel_path, - "sha256": sha256_file(path), - "size": size, - "elf": info, - "file_cmd": file_desc - }) - - strings_text = "\n".join(run_strings(path)) - scan_text_content(path, strings_text, rel_path) - scan_busybox(path, strings_text, rel_path) - scan_possible_cves(strings_text, rel_path) - - if is_text_file(path): - content = read_text(path) - scan_text_content(path, content, rel_path) - scan_web_endpoints(path, content, rel_path) - scan_possible_cves(content, rel_path) - scan_busybox(path, content, rel_path) - - if yara_rules: - yara_scan_file(path, rel_path, yara_rules) - - except Exception as e: - add("scan_errors", {"file": str(path), "error": str(e)}) - - -# ============================================================================= -# POST PROCESSING -# ============================================================================= - -def detect_firmware_identity(root): - identity_files = [ - "etc/os-release", "etc/openwrt_release", "etc/openwrt_version", - "etc/version", "etc/banner", "etc/issue", "version", "release" - ] - - for rel in identity_files: - path = Path(root) / rel - if path.exists() and path.is_file(): - text = read_text(path, max_chars=5000) - if text: - add("firmware_identity", { - "file": rel, - "content_preview": text[:1000] - }) - - # Heuristic identity from config/version strings - vendor_model_keywords = ["model", "vendor", "product", "device", "board", "firmware", "version"] - for item in REPORT.get("version_strings", [])[:200]: - versions = item.get("versions", []) - hits = [v for v in versions if any(k in v.lower() for k in vendor_model_keywords)] - if hits: - add("firmware_identity", { - "file": item.get("file"), - "identity_hints": hits[:20] - }) - - -def summarize_architecture(): - machines = Counter() - endians = Counter() - classes = Counter() - - for item in REPORT.get("elf_binaries", []): - elf = item.get("elf", {}) - if elf.get("machine"): - machines[elf["machine"]] += 1 - if elf.get("endian"): - endians[elf["endian"]] += 1 - if elf.get("class"): - classes[elf["class"]] += 1 - - if machines or endians or classes: - add("architecture_summary", { - "machines": dict(machines.most_common()), - "endian": dict(endians.most_common()), - "class": dict(classes.most_common()) - }) - - -def summarize_largest_files(root, limit=30): - files = [] - for dirpath, _, filenames in os.walk(root): - for filename in filenames: - p = Path(dirpath) / filename - try: - if p.is_file(): - files.append((p.stat().st_size, safe_rel(p, root), sha256_file(p))) - except Exception: - pass - - for size, rel, digest in sorted(files, reverse=True)[:limit]: - add("largest_files", {"file": rel, "size": size, "sha256": digest}) - - -def run_plugins(plugin_dir, root): - if not plugin_dir: - return - - plugin_dir = Path(plugin_dir) - if not plugin_dir.exists(): - add("tool_warnings", {"tool": "plugins", "warning": f"Plugin directory not found: {plugin_dir}"}) - return - - for plugin_file in plugin_dir.glob("*.py"): - try: - spec = importlib.util.spec_from_file_location(plugin_file.stem, plugin_file) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - - if hasattr(module, "scan"): - result = module.scan(str(root)) - add("plugin_findings", { - "plugin": plugin_file.name, - "result": result - }) - else: - add("tool_warnings", { - "tool": "plugins", - "warning": f"{plugin_file.name} has no scan(root) function" - }) - except Exception as e: - add("scan_errors", { - "file": str(plugin_file), - "error": f"Plugin failed: {e}" - }) - - -def score_findings(): - weights = { - "private_keys": 35, - "credential_findings": 20, - "possible_password": 15, - "wifi_psk": 15, - "api_key": 15, - "jwt": 15, - "users_groups": 12, - "yara_matches": 35, - "malware_iocs": 30, - "interesting_binaries": 8, - "suspicious_keywords": 6, - "web_routes": 5, - "web_endpoints": 5, - "ssh_related": 5, - "startup_scripts": 5, - "cve_references": 5, - "cve_hints": 3, - } - - score = 0 - reasons = [] - - for category, weight in weights.items(): - count = len(REPORT.get(category, [])) - if count: - added = min(count * weight, weight * 6) - score += added - reasons.append(f"{category}: {count} finding(s), +{added}") - - critical = any(x.get("severity") == "critical" for x in REPORT.get("credential_findings", []) if isinstance(x, dict)) - malware = len(REPORT.get("malware_iocs", [])) > 0 - - if score >= 180 or critical or malware: - level = "CRITICAL" if critical or malware else "HIGH" - elif score >= 100: - level = "HIGH" - elif score >= 50: - level = "MEDIUM" - elif score > 0: - level = "LOW" - else: - level = "NONE" - - REPORT["risk_summary"] = [{ - "score": score, - "level": level, - "reasons": reasons, - "notes": [ - "Risk score is a triage aid, not proof of compromise.", - "Manually validate credentials, CVEs, and malware indicators." - ] - }] - - -# ============================================================================= -# REPORTING -# ============================================================================= - -def write_list_file(output_dir, filename, title, items): - path = output_dir / filename - - with open(path, "w", encoding="utf-8") as f: - f.write(title + "\n") - f.write("=" * 100 + "\n\n") - - if not items: - f.write("No findings.\n") - return path - - for item in items: - if isinstance(item, dict): - for k, v in item.items(): - f.write(f"{k}: {v}\n") - f.write("-" * 100 + "\n") - else: - f.write(str(item) + "\n") - - return path - - -def write_csv(output_dir): - csv_file = output_dir / "findings.csv" - rows = [] - - for title, key, _filename in IMPORTANT_SECTIONS: - for item in REPORT.get(key, []): - if isinstance(item, dict): - rows.append({ - "category": key, - "title": title, - "file": item.get("file", ""), - "value": item.get("value", item.get("route", item.get("component", ""))), - "severity": item.get("severity", ""), - "confidence": item.get("confidence", ""), - "details": json.dumps(item, ensure_ascii=False) - }) - else: - rows.append({ - "category": key, - "title": title, - "file": "", - "value": str(item), - "severity": "", - "confidence": "", - "details": str(item) - }) - - with open(csv_file, "w", newline="", encoding="utf-8") as f: - writer = csv.DictWriter(f, fieldnames=["category", "title", "file", "value", "severity", "confidence", "details"]) - writer.writeheader() - writer.writerows(rows) - - return csv_file - - -def write_markdown(output_dir): - md_file = output_dir / "firmware_report.md" - risk = REPORT.get("risk_summary", [{}])[0] - totals = REPORT.get("scan_totals", [{}])[0] - meta = REPORT.get("scan_metadata", [{}])[0] - - with open(md_file, "w", encoding="utf-8") as f: - f.write(f"# Firmware Hunter Pro v{VERSION} Report\n\n") - f.write("## Scan Info\n\n") - f.write(f"- Firmware root: `{meta.get('root', 'Unknown')}`\n") - f.write(f"- Started: `{meta.get('started', 'Unknown')}`\n") - f.write(f"- Finished: `{meta.get('finished', 'Unknown')}`\n") - f.write(f"- Total files: `{totals.get('total_files', 0)}`\n") - f.write(f"- Total size: `{totals.get('total_size_bytes', 0)}` bytes\n") - f.write(f"- Quick mode: `{meta.get('quick_mode', False)}`\n\n") - - f.write("## Risk Summary\n\n") - f.write(f"- Level: **{risk.get('level', 'UNKNOWN')}**\n") - f.write(f"- Score: **{risk.get('score', 0)}**\n\n") - - f.write("## Findings Overview\n\n") - f.write("| Finding | Count |\n|---|---:|\n") - for title, key, _ in IMPORTANT_SECTIONS: - count = len(REPORT.get(key, [])) - if count: - f.write(f"| {title} | {count} |\n") - - f.write("\n## Recommended Next Steps\n\n") - for step in recommended_steps(): - f.write(f"- {step}\n") - - return md_file - - -def write_html_report(output_dir): - html_file = output_dir / "firmware_report.html" - risk = REPORT.get("risk_summary", [{}])[0] - totals = REPORT.get("scan_totals", [{}])[0] - meta = REPORT.get("scan_metadata", [{}])[0] - - severity_color = { - "CRITICAL": "#ef4444", - "HIGH": "#f97316", - "MEDIUM": "#eab308", - "LOW": "#22c55e", - "NONE": "#94a3b8" - }.get(risk.get("level", "UNKNOWN"), "#94a3b8") - - with open(html_file, "w", encoding="utf-8") as f: - f.write(f""" - - - -Firmware Hunter Pro Report - - - - -
-

Firmware Hunter Pro v{VERSION}

-

Offline firmware triage report

-
-
-
-
Risk{html.escape(str(risk.get('level', 'UNKNOWN')))}
-
Risk Score{html.escape(str(risk.get('score', 0)))}
-
Total Files{html.escape(str(totals.get('total_files', 0)))}
-
Total Size{html.escape(str(totals.get('total_size_bytes', 0)))} bytes
-
- -
-

Scan Info

-

Firmware Root: {html.escape(str(meta.get('root', 'Unknown')))}

-

Started: {html.escape(str(meta.get('started', 'Unknown')))}

-

Finished: {html.escape(str(meta.get('finished', 'Unknown')))}

-

Quick Mode: {html.escape(str(meta.get('quick_mode', False)))}

-

Worker Jobs: {html.escape(str(meta.get('jobs', 'Unknown')))}

-
- -
-

Findings Overview

-""") - - for title, key, filename in IMPORTANT_SECTIONS: - count = len(REPORT.get(key, [])) - if count: - f.write(f"

{html.escape(title)}: {count} — {html.escape(filename)}

\n") - - f.write("""
-
-

Recommended Next Steps

-
    -""") - for step in recommended_steps(): - f.write(f"
  • {html.escape(step)}
  • \n") - - f.write("""
-
- -

Detailed Findings

- -""") - - for title, key, _filename in IMPORTANT_SECTIONS: - items = REPORT.get(key, []) - if not items: - continue - f.write(f"
{html.escape(title)} ({len(items)})\n") - f.write("
")
-            f.write(html.escape(json.dumps(items, indent=2, ensure_ascii=False)))
-            f.write("
\n") - - f.write("
") - - return html_file - - -def recommended_steps(): - return [ - "Review credential_findings.txt, possible_passwords.txt, and wifi_psks.txt for hardcoded credentials.", - "Review startup_scripts.txt to understand what starts at boot.", - "Review web_routes.txt and web_endpoints.txt for login panels, CGI handlers, firmware update routes, and debug routes.", - "Review interesting_binaries.txt for telnet, dropbear, busybox, wget, tftp, netcat, and web servers.", - "Review private_keys.txt and certificates.txt for exposed secrets or reused keys.", - "Review components.txt and cve_hints.txt, then verify versions against authoritative vulnerability databases.", - "Review malware_iocs.txt and yara_matches.txt, then manually validate before making conclusions.", - "Use full_report.json or findings.csv for automation, diffing, or importing into other tools." - ] - - -def write_reports(base_output): - timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - output_dir = Path(base_output) / f"scan_{timestamp}" - output_dir.mkdir(parents=True, exist_ok=True) - - json_file = output_dir / "full_report.json" - main_report = output_dir / "firmware_report.txt" - summary_file = output_dir / "summary.txt" - - normal_report = {k: v for k, v in REPORT.items()} - - with open(json_file, "w", encoding="utf-8") as f: - json.dump(normal_report, f, indent=4, ensure_ascii=False) - - risk = REPORT.get("risk_summary", [{}])[0] - totals = REPORT.get("scan_totals", [{}])[0] - meta = REPORT.get("scan_metadata", [{}])[0] - - for title, key, filename in IMPORTANT_SECTIONS: - items = REPORT.get(key, []) - if items: - write_list_file(output_dir, filename, title, items) - - csv_file = write_csv(output_dir) - md_file = write_markdown(output_dir) - html_file = write_html_report(output_dir) - - with open(summary_file, "w", encoding="utf-8") as f: - f.write("Firmware Hunter Pro Summary\n") - f.write("=" * 100 + "\n\n") - f.write(f"Scan Time: {timestamp}\n") - f.write(f"Firmware Root: {meta.get('root', 'Unknown')}\n") - f.write(f"Quick Mode: {meta.get('quick_mode', False)}\n") - f.write(f"Jobs: {meta.get('jobs', 'Unknown')}\n") - f.write(f"Total Files: {totals.get('total_files', 0)}\n") - f.write(f"Total Size: {totals.get('total_size_bytes', 0)} bytes\n\n") - - f.write("Risk Summary\n") - f.write("-" * 100 + "\n") - f.write(f"Risk Level: {risk.get('level', 'UNKNOWN')}\n") - f.write(f"Risk Score: {risk.get('score', 0)}\n\n") - - f.write("Finding Counts\n") - f.write("-" * 100 + "\n") - for title, key, _ in IMPORTANT_SECTIONS: - count = len(REPORT.get(key, [])) - if count: - f.write(f"{title:<35} {count}\n") - - with open(main_report, "w", encoding="utf-8") as f: - f.write("Firmware Hunter Pro Report\n") - f.write("=" * 100 + "\n") - f.write("Offline firmware triage report\n") - f.write("=" * 100 + "\n\n") - - f.write("[SCAN INFO]\n") - f.write(f"Firmware Root : {meta.get('root', 'Unknown')}\n") - f.write(f"Started : {meta.get('started', 'Unknown')}\n") - f.write(f"Finished : {meta.get('finished', 'Unknown')}\n") - f.write(f"Quick Mode : {meta.get('quick_mode', False)}\n") - f.write(f"Worker Jobs : {meta.get('jobs', 'Unknown')}\n") - f.write(f"Total Files : {totals.get('total_files', 0)}\n") - f.write(f"Total Size : {totals.get('total_size_bytes', 0)} bytes\n\n") - - f.write("[RISK SUMMARY]\n") - f.write(f"Level : {risk.get('level', 'UNKNOWN')}\n") - f.write(f"Score : {risk.get('score', 0)}\n\n") - - if risk.get("reasons"): - f.write("Reasons:\n") - for reason in risk.get("reasons", []): - f.write(f" - {reason}\n") - f.write("\n") - - f.write("[FINDINGS OVERVIEW]\n") - for title, key, filename in IMPORTANT_SECTIONS: - count = len(REPORT.get(key, [])) - if count: - f.write(f"[+] {title:<35} {count:<5} -> {filename}\n") - else: - f.write(f"[-] {title:<35} 0\n") - - f.write("\n[RECOMMENDED NEXT STEPS]\n") - f.write("-" * 100 + "\n") - for idx, step in enumerate(recommended_steps(), 1): - f.write(f"{idx}. {step}\n") - - f.write("\n[GENERATED FILES]\n") - for file in sorted(output_dir.iterdir()): - f.write(f"- {file.name}\n") - - print() - print("=" * 100) - print(" Firmware Hunter Pro Scan Complete") - print("=" * 100) - print(f" Output Folder : {output_dir}") - print(f" Main Report : {main_report}") - print(f" Summary : {summary_file}") - print(f" HTML Report : {html_file}") - print(f" Markdown : {md_file}") - print(f" CSV Findings : {csv_file}") - print(f" JSON Report : {json_file}") - print("=" * 100) - - -# ============================================================================= -# MAIN SCAN -# ============================================================================= - -def scan_firmware(root, yara_rules=None, quick=False, jobs=8, plugins=None): - root = Path(root).resolve() - - if not root.exists(): - raise FileNotFoundError(f"Firmware path does not exist: {root}") - - if not root.is_dir(): - raise NotADirectoryError(f"Firmware path should be an extracted directory/rootfs: {root}") - - files = [] - total_size = 0 - - for dirpath, _, filenames in os.walk(root): - for filename in filenames: - path = Path(dirpath) / filename - if path.is_file(): - files.append(path) - try: - total_size += path.stat().st_size - except Exception: - pass - - REPORT["scan_metadata"] = [{ - "tool": "Firmware Hunter Pro", - "version": VERSION, - "root": str(root), - "started": now_iso(), - "quick_mode": quick, - "jobs": jobs, - "yara_rules": str(yara_rules) if yara_rules else None, - "plugins": str(plugins) if plugins else None - }] - - print(f"[+] Files discovered: {len(files)}") - print(f"[+] Total size: {total_size} bytes") - print(f"[+] Worker threads: {jobs}") - - completed = 0 - start_time = time.time() - - with ThreadPoolExecutor(max_workers=max(1, jobs)) as executor: - futures = [executor.submit(scan_file, f, root, yara_rules, quick) for f in files] - - for future in as_completed(futures): - completed += 1 - try: - future.result() - except Exception as e: - add("scan_errors", {"file": "unknown", "error": str(e)}) - - if completed % 500 == 0: - elapsed = max(time.time() - start_time, 1) - rate = completed / elapsed - print(f"[+] Progress: {completed}/{len(files)} files scanned ({rate:.1f} files/sec)") - - detect_firmware_identity(root) - summarize_architecture() - summarize_largest_files(root) - run_plugins(plugins, root) - score_findings() - - REPORT["scan_totals"] = [{ - "total_files": len(files), - "total_size_bytes": total_size, - "elapsed_seconds": round(time.time() - start_time, 2) - }] - - REPORT["scan_metadata"][0]["finished"] = now_iso() - - -def main(): - parser = argparse.ArgumentParser( - description="Firmware Hunter Pro v4.0 - advanced offline firmware triage scanner" - ) - - parser.add_argument("target", help="Path to extracted firmware/rootfs directory, or firmware image with --extract") - parser.add_argument("-o", "--output", default="firmware_hunter_output", help="Base output directory") - parser.add_argument("--extract", action="store_true", help="Use binwalk to extract target first, then scan extraction") - parser.add_argument("--yara", help="Optional path to YARA rule file or directory") - parser.add_argument("--quick", action="store_true", help="Quick mode: skip files larger than 10 MB") - parser.add_argument("-j", "--jobs", type=int, default=8, help="Number of worker threads, default: 8") - parser.add_argument("--plugins", help="Optional plugins directory. Each plugin should expose scan(root).") - - args = parser.parse_args() - - print(f"[+] Firmware Hunter Pro v{VERSION}") - print("[+] Safe mode: this tool does not execute firmware binaries.") - - check_dependencies(yara_requested=bool(args.yara), extract_requested=args.extract) - - target = Path(args.target).resolve() - - if args.extract: - scan_root = extract_with_binwalk(target, args.output) - else: - scan_root = target - - print(f"[+] Scanning: {scan_root}") - - scan_firmware( - scan_root, - yara_rules=args.yara, - quick=args.quick, - jobs=args.jobs, - plugins=args.plugins - ) - - write_reports(args.output) - - -if __name__ == "__main__": - main()