diff --git a/firmware_hunter_pro_v4.py b/firmware_hunter_pro_v4.py new file mode 100644 index 0000000..ac0d781 --- /dev/null +++ b/firmware_hunter_pro_v4.py @@ -0,0 +1,1387 @@ +#!/usr/bin/env python3 +""" +Firmware Hunter Pro v4.0 +A safer offline firmware triage framework for extracted router / IoT firmware. + +What it does: +- Scans extracted firmware directories safely without executing firmware binaries. +- Optionally extracts a firmware image with binwalk if binwalk is installed. +- Finds credentials, keys, certs, IPs, domains, URLs, MACs, JWTs, API keys, hashes, CVEs. +- Maps web UI files, CGI handlers, forms, scripts, endpoints, and admin-looking routes. +- Detects ELF binaries, architecture hints, BusyBox, Linux kernel, OpenSSL, Dropbear, dnsmasq, uClibc/musl/glibc. +- Scores findings with severity and confidence. +- Produces TXT, JSON, HTML, Markdown, CSV, and separate evidence files. +- Supports simple external plugins from a plugins directory. + +Safe by default: +- Does not execute firmware binaries. +- Only reads files and optionally runs host tools like strings, file, binwalk, yara. + +Usage: + python3 firmware_hunter_pro.py /path/to/squashfs-root + python3 firmware_hunter_pro.py firmware.bin --extract + python3 firmware_hunter_pro.py /path/to/rootfs --quick + python3 firmware_hunter_pro.py /path/to/rootfs --yara rules.yar + python3 firmware_hunter_pro.py /path/to/rootfs --plugins plugins/ +""" + +import os +import re +import csv +import sys +import json +import math +import time +import html +import shutil +import hashlib +import argparse +import tempfile +import subprocess +import importlib.util +from pathlib import Path +from datetime import datetime +from collections import Counter, defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Lock + +VERSION = "4.0" +REPORT = defaultdict(list) +REPORT_LOCK = Lock() + +# ============================================================================= +# CONFIG +# ============================================================================= + +TEXT_EXTENSIONS = { + ".txt", ".log", ".conf", ".cfg", ".ini", ".json", ".xml", ".html", ".htm", + ".js", ".php", ".asp", ".cgi", ".lua", ".sh", ".service", ".default", + ".profile", ".passwd", ".shadow", ".pem", ".key", ".crt", ".pub", ".yaml", + ".yml", ".env", ".properties", ".rc", ".rules" +} + +INTERESTING_NAMES = { + "passwd", "shadow", "group", "hosts", "resolv.conf", "inittab", "fstab", + "rcS", "rc.local", "profile", "authorized_keys", "known_hosts", "motd", + "issue", "services", "inetd", "inetd.conf", "udhcpd.conf", "dnsmasq.conf", + "dropbear", "telnetd", "lighttpd.conf", "boa.conf", "httpd.conf", + "mini_httpd.conf", "uhttpd.conf", "nginx.conf", "config.xml", "nvram", + "default.cfg", "wpa_supplicant.conf", "os-release", "version", "release", + "syslog.conf", "crontab" +} + +EXECUTABLE_NAMES = { + "busybox", "telnetd", "dropbear", "sshd", "httpd", "boa", "lighttpd", + "mini_httpd", "uhttpd", "nginx", "nc", "netcat", "wget", "curl", "tftp", + "ftpget", "ftpput", "iptables", "ip6tables", "dnsmasq", "udhcpd", "pppd", + "openvpn", "openssl", "sqlite3", "ash", "sh" +} + +SUSPICIOUS_KEYWORDS = { + "telnetd", "dropbear", "busybox telnet", "nc -l", "netcat", "/bin/sh", + "/bin/ash", "reverse shell", "backdoor", "wget http", "curl http", "tftp", + "ftpget", "ftpput", "chmod 777", "0.0.0.0", "admin:admin", "root:root", + "password", "passwd", "shadow", "debug", "test", "factory", "developer", + "enable telnet", "remote shell", "hardcoded", "debug shell", "diagnostic", + "support account", "superuser", "hidden", "maintenance" +} + +MALWARE_IOC_KEYWORDS = { + "mirai": ["mirai", "busybox MIRAI", "/bin/busybox", "report.%s", "scanListen"], + "gafgyt/bashlite": ["gafgyt", "bashlite", "gayfgt", "loligang", "telnet scanner"], + "mozi": ["mozi", "Mozi.m", "dht.transmissionbt.com", "router.bittorrent.com"], + "xorddos": ["xorddos", "x0r", "/tmp/.x", "BB2FA36AAA9541F0"], + "miner": ["stratum+tcp", "xmrig", "minerd", "cryptonight", "monero"], + "generic_bot": ["CNC", "C2", "botnet", "udp flood", "syn flood", "http flood"] +} + +REGEX_PATTERNS = { + "ipv4": r"\b(?:\d{1,3}\.){3}\d{1,3}\b", + "mac_address": r"\b(?:[0-9a-fA-F]{2}[:-]){5}[0-9a-fA-F]{2}\b", + "url": r"https?://[^\s'\"<>]{4,}", + "email": r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+", + "domain": r"\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|cn|ru|info|biz|us|uk|co|dev|cloud|local|lan)\b", + "md5": r"\b[a-fA-F0-9]{32}\b", + "sha1": r"\b[a-fA-F0-9]{40}\b", + "sha256": r"\b[a-fA-F0-9]{64}\b", + "jwt": r"eyJ[a-zA-Z0-9_\-]+\.[a-zA-Z0-9_\-]+\.[a-zA-Z0-9_\-]+", + "api_key": r"(?i)(api[_-]?key|apikey|access[_-]?key|secret[_-]?key|client_secret)\s*[:=]\s*['\"]?([a-zA-Z0-9_\-]{12,})", + "wifi_psk": r"(?i)(passphrase|psk|wpa_pass|wifi_pass|wifi_password|wireless_key|wpakey)\s*[:=]\s*['\"]?([^\s'\";&]{8,63})", + "possible_password": r"(?i)(password|passwd|pwd|pass|admin_pass|root_pass|web_pass|wifi_pass|secret|token)\s*[:=]\s*['\"]?([^\s'\";&]{4,})", + "possible_username": r"(?i)(username|user|login|admin_user|root_user)\s*[:=]\s*['\"]?([^\s'\";&]{3,})", + "mqtt_credential": r"(?i)(mqtt_(?:user|username|pass|password)|mqttUser|mqttPass)\s*[:=]\s*['\"]?([^\s'\";&]{3,})", + "basic_auth": r"(?i)authorization:\s*basic\s+([a-zA-Z0-9+/=]{8,})", +} + +PRIVATE_KEY_MARKERS = { + "-----BEGIN RSA PRIVATE KEY-----", + "-----BEGIN DSA PRIVATE KEY-----", + "-----BEGIN EC PRIVATE KEY-----", + "-----BEGIN OPENSSH PRIVATE KEY-----", + "-----BEGIN PRIVATE KEY-----" +} + +CERT_MARKERS = {"-----BEGIN CERTIFICATE-----"} + +COMPONENT_PATTERNS = { + "busybox": [ + r"BusyBox v([\w.\-]+)", + r"busybox\s+v([\w.\-]+)" + ], + "linux_kernel": [ + r"Linux version ([\w.\-]+)", + r"kernel version[:= ]+([\w.\-]+)" + ], + "openssl": [ + r"OpenSSL\s+([0-9][\w.\-]+[a-z]?)", + r"openssl-([0-9][\w.\-]+)" + ], + "dropbear": [ + r"Dropbear sshd v?([0-9][\w.\-]+)", + r"dropbear[_ -]?([0-9]{4}\.[0-9]{2})" + ], + "dnsmasq": [ + r"dnsmasq-?([0-9][\w.\-]+)", + r"dnsmasq version ([0-9][\w.\-]+)" + ], + "uClibc": [ + r"uClibc-?([0-9][\w.\-]+)" + ], + "musl": [ + r"musl-?([0-9][\w.\-]+)" + ], + "glibc": [ + r"GNU C Library.*?release version ([0-9][\w.\-]+)", + r"GLIBC_([0-9.]+)" + ], + "lighttpd": [ + r"lighttpd/([0-9][\w.\-]+)" + ], + "boa": [ + r"Boa/([0-9][\w.\-]+)" + ], + "uhttpd": [ + r"uhttpd[-/ ]([0-9][\w.\-]+)" + ] +} + +CVE_HINTS = { + "busybox": { + "note": "BusyBox is frequently old in embedded firmware. Verify exact version against NVD/vendor advisories.", + "keywords": ["busybox", "ash", "udhcp", "telnetd"] + }, + "openssl": { + "note": "OpenSSL version detected. Check for old TLS/crypto CVEs and weak certificate/key usage.", + "keywords": ["openssl", "libssl", "libcrypto"] + }, + "dropbear": { + "note": "Dropbear SSH detected. Check version against Dropbear security advisories.", + "keywords": ["dropbear", "sshd"] + }, + "dnsmasq": { + "note": "dnsmasq detected. Check version for DNS/DHCP vulnerabilities.", + "keywords": ["dnsmasq"] + }, + "boa": { + "note": "Boa web server is commonly outdated in IoT firmware.", + "keywords": ["boa"] + } +} + +IMPORTANT_SECTIONS = [ + ("Credential Findings", "credential_findings", "credential_findings.txt"), + ("Possible Passwords", "possible_password", "possible_passwords.txt"), + ("Wi-Fi PSKs", "wifi_psk", "wifi_psks.txt"), + ("Possible Usernames", "possible_username", "possible_usernames.txt"), + ("API Keys / Secrets", "api_key", "api_keys.txt"), + ("MQTT Credentials", "mqtt_credential", "mqtt_credentials.txt"), + ("JWT Tokens", "jwt", "jwt_tokens.txt"), + ("Private Keys", "private_keys", "private_keys.txt"), + ("Certificates", "certificates", "certificates.txt"), + ("IP Addresses", "ipv4", "ip_addresses.txt"), + ("MAC Addresses", "mac_address", "mac_addresses.txt"), + ("URLs", "url", "urls.txt"), + ("Domains", "domain", "domains.txt"), + ("Emails", "email", "emails.txt"), + ("Component Versions", "components", "components.txt"), + ("Version Strings", "version_strings", "version_strings.txt"), + ("Firmware Identity", "firmware_identity", "firmware_identity.txt"), + ("Startup Scripts", "startup_scripts", "startup_scripts.txt"), + ("Interesting Files", "interesting_files", "interesting_files.txt"), + ("Interesting Binaries", "interesting_binaries", "interesting_binaries.txt"), + ("ELF Binaries", "elf_binaries", "elf_binaries.txt"), + ("Architecture Summary", "architecture_summary", "architecture_summary.txt"), + ("Web Files", "web_files", "web_files.txt"), + ("Web Routes", "web_routes", "web_routes.txt"), + ("Web Endpoints", "web_endpoints", "web_endpoints.txt"), + ("Cron Jobs", "cron_jobs", "cron_jobs.txt"), + ("Users / Groups", "users_groups", "users_groups.txt"), + ("SSH Related Files", "ssh_related", "ssh_related.txt"), + ("Suspicious Keywords", "suspicious_keywords", "suspicious_keywords.txt"), + ("Malware IOC Matches", "malware_iocs", "malware_iocs.txt"), + ("High Entropy Files", "high_entropy_files", "high_entropy_files.txt"), + ("Largest Files", "largest_files", "largest_files.txt"), + ("BusyBox Findings", "busybox", "busybox.txt"), + ("CVE References", "cve_references", "cve_references.txt"), + ("CVE Hints", "cve_hints", "cve_hints.txt"), + ("YARA Matches", "yara_matches", "yara_matches.txt"), + ("Plugin Findings", "plugin_findings", "plugin_findings.txt"), + ("Skipped Large Files", "skipped_large_files", "skipped_large_files.txt"), + ("Tool Warnings", "tool_warnings", "tool_warnings.txt"), + ("Scan Errors", "scan_errors", "scan_errors.txt"), +] + + +# ============================================================================= +# BASIC HELPERS +# ============================================================================= + +def add(category, data): + """Thread-safe de-duplicated report append.""" + with REPORT_LOCK: + if data not in REPORT[category]: + REPORT[category].append(data) + + +def safe_rel(path, root): + try: + return str(Path(path).resolve().relative_to(Path(root).resolve())) + except Exception: + return str(path) + + +def now_iso(): + return datetime.now().isoformat(timespec="seconds") + + +def check_dependencies(yara_requested=False, extract_requested=False): + dependencies = ["strings"] + if yara_requested: + dependencies.append("yara") + if extract_requested: + dependencies.append("binwalk") + + for tool in dependencies: + if shutil.which(tool) is None: + add("tool_warnings", { + "tool": tool, + "warning": f"'{tool}' was not found in PATH. Some features may be limited." + }) + print(f"[!] Warning: '{tool}' not found. Some features may be limited.") + + +def sha256_file(path): + try: + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() + except Exception: + return None + + +def entropy_file(path, max_bytes=1024 * 1024): + try: + with open(path, "rb") as f: + data = f.read(max_bytes) + if not data: + return 0.0 + counts = Counter(data) + length = len(data) + entropy = -sum((count / length) * math.log2(count / length) for count in counts.values()) + return round(entropy, 4) + except Exception: + return None + + +def is_text_file(path): + if path.suffix.lower() in TEXT_EXTENSIONS: + return True + try: + with open(path, "rb") as f: + chunk = f.read(4096) + if not chunk: + return False + return chunk.count(b"\x00") / len(chunk) < 0.08 + except Exception: + return False + + +def read_text(path, max_chars=None): + try: + text = path.read_text(errors="ignore") + if max_chars: + return text[:max_chars] + return text + except Exception: + return "" + + +def is_elf(path): + try: + with open(path, "rb") as f: + return f.read(4) == b"\x7fELF" + except Exception: + return False + + +def elf_info(path): + try: + with open(path, "rb") as f: + header = f.read(20) + + if len(header) < 20: + return {} + + elf_class = header[4] + endian = header[5] + byte_order = "little" if endian == 1 else "big" if endian == 2 else "little" + machine = int.from_bytes(header[18:20], byte_order) + + arch_map = { + 3: "x86", + 8: "MIPS", + 20: "PowerPC", + 40: "ARM", + 62: "x86_64", + 183: "AArch64", + 243: "RISC-V" + } + + return { + "class": "32-bit" if elf_class == 1 else "64-bit" if elf_class == 2 else "unknown", + "endian": "little" if endian == 1 else "big" if endian == 2 else "unknown", + "machine": arch_map.get(machine, f"unknown-{machine}") + } + except Exception: + return {} + + +def run_strings(path, limit=1200): + try: + result = subprocess.run( + ["strings", str(path)], + capture_output=True, + text=True, + timeout=20, + errors="ignore" + ) + return result.stdout.splitlines()[:limit] + except Exception: + return [] + + +def run_file_cmd(path): + if shutil.which("file") is None: + return None + try: + result = subprocess.run( + ["file", "-b", str(path)], + capture_output=True, + text=True, + timeout=5, + errors="ignore" + ) + return result.stdout.strip() + except Exception: + return None + + +# ============================================================================= +# EXTRACTION +# ============================================================================= + +def extract_with_binwalk(firmware_image, output_base): + firmware_image = Path(firmware_image).resolve() + extract_dir = Path(output_base).resolve() / f"binwalk_extract_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" + extract_dir.mkdir(parents=True, exist_ok=True) + + if shutil.which("binwalk") is None: + raise RuntimeError("binwalk not found. Install it or scan an already extracted rootfs directory.") + + print(f"[+] Extracting with binwalk into: {extract_dir}") + try: + subprocess.run( + ["binwalk", "-Me", str(firmware_image), "--directory", str(extract_dir)], + check=False, + timeout=1800 + ) + except subprocess.TimeoutExpired: + add("tool_warnings", {"tool": "binwalk", "warning": "binwalk extraction timed out"}) + except Exception as e: + add("tool_warnings", {"tool": "binwalk", "warning": f"binwalk extraction failed: {e}"}) + + candidates = find_rootfs_candidates(extract_dir) + if candidates: + print("[+] Possible rootfs candidates:") + for idx, cand in enumerate(candidates[:10], 1): + print(f" {idx}. {cand}") + return candidates[0] + + print("[!] No obvious rootfs found. Scanning extraction directory instead.") + return extract_dir + + +def find_rootfs_candidates(base_dir): + base_dir = Path(base_dir) + candidates = [] + + for dirpath, dirnames, filenames in os.walk(base_dir): + d = Path(dirpath) + names = set(filenames) | set(dirnames) + score = 0 + + if "etc" in names: + score += 3 + if "bin" in names: + score += 2 + if "sbin" in names: + score += 2 + if "www" in names or "web" in names or "htdocs" in names: + score += 2 + if "passwd" in names or "shadow" in names: + score += 2 + if "init.d" in names: + score += 2 + + if score >= 5: + candidates.append((score, d)) + + candidates.sort(key=lambda x: x[0], reverse=True) + return [c[1] for c in candidates] + + +# ============================================================================= +# SCANNERS +# ============================================================================= + +def extract_version_strings(text): + patterns = [ + r"(?i)(version|build|release|fw_ver|firmware|software|sw_ver|hardware|hw_ver|model|vendor|product)\s*[:=]\s*['\"]?([\w.\-/ ]{3,80})", + r"(?i)(busybox\s+v\d{1,3}\.\d{1,3}(?:\.\d{1,5})?)", + r"(?i)(linux\s+version\s+[\w.\-]+)", + r"\bV(\d{1,3}\.\d{1,3}\.\d{1,5})\b", + r"\b(\d{4}-\d{2}-\d{2})\b", + ] + + versions = set() + + for pattern in patterns: + for match in re.findall(pattern, text): + if isinstance(match, tuple): + value = match[-1] + else: + value = match + value = " ".join(value.strip().split()) + if 3 <= len(value) <= 120: + versions.add(value) + + return sorted(versions)[:40] + + +def detect_components(text, rel_path): + for component, patterns in COMPONENT_PATTERNS.items(): + for pattern in patterns: + for match in re.findall(pattern, text, re.I): + version = match[-1] if isinstance(match, tuple) else match + version = str(version).strip() + if version: + add("components", { + "component": component, + "version": version, + "file": rel_path + }) + if component in CVE_HINTS: + add("cve_hints", { + "component": component, + "version": version, + "file": rel_path, + "note": CVE_HINTS[component]["note"] + }) + + +def detect_malware_iocs(text, rel_path): + low = text.lower() + for family, indicators in MALWARE_IOC_KEYWORDS.items(): + hits = [] + for indicator in indicators: + if indicator.lower() in low: + hits.append(indicator) + if hits: + add("malware_iocs", { + "family_or_category": family, + "file": rel_path, + "indicators": sorted(set(hits)), + "confidence": "medium" if len(hits) == 1 else "high" + }) + + +def scan_text_content(path, content, rel_path): + for name, pattern in REGEX_PATTERNS.items(): + for match in re.findall(pattern, content): + if isinstance(match, tuple): + finding = {"file": rel_path, "key": match[0], "value": match[1]} + value_for_validation = match[1] + else: + finding = {"file": rel_path, "value": match} + value_for_validation = match + + if name == "ipv4": + parts = value_for_validation.split(".") + if not all(p.isdigit() and 0 <= int(p) <= 255 for p in parts): + continue + + add(name, finding) + + if name in {"possible_password", "wifi_psk", "api_key", "mqtt_credential"}: + add_credential_finding(name, finding) + + for marker in PRIVATE_KEY_MARKERS: + if marker in content: + add("private_keys", {"file": rel_path, "marker": marker}) + add("credential_findings", { + "file": rel_path, + "type": "private_key", + "value": marker, + "confidence": "high", + "severity": "critical", + "reason": "Private key material marker found" + }) + + for marker in CERT_MARKERS: + if marker in content: + add("certificates", {"file": rel_path, "marker": marker}) + + lowered = content.lower() + for keyword in SUSPICIOUS_KEYWORDS: + if keyword.lower() in lowered: + add("suspicious_keywords", {"file": rel_path, "keyword": keyword}) + + versions = extract_version_strings(content) + if versions: + add("version_strings", {"file": rel_path, "versions": versions}) + + detect_components(content, rel_path) + detect_malware_iocs(content, rel_path) + + +def add_credential_finding(kind, finding): + value = str(finding.get("value", "")) + key = str(finding.get("key", "")) + file = finding.get("file", "") + + confidence = "medium" + severity = "medium" + reasons = [] + + if kind in {"api_key", "wifi_psk"}: + confidence = "high" + severity = "high" + reasons.append(f"{kind} pattern matched") + + if key.lower() in {"password", "passwd", "pwd", "admin_pass", "root_pass", "secret", "token"}: + confidence = "high" + severity = "high" + reasons.append("credential-like key name") + + if value.lower() in {"admin", "root", "password", "123456", "12345678", "admin123", "root123"}: + confidence = "high" + severity = "high" + reasons.append("common/default credential value") + + if any(x in file.lower() for x in ["shadow", "passwd", "default", "config", "nvram", "wpa"]): + reasons.append("sensitive config path") + if confidence == "medium": + confidence = "high" + + add("credential_findings", { + "file": file, + "type": kind, + "key": key, + "value": value, + "confidence": confidence, + "severity": severity, + "reason": "; ".join(reasons) if reasons else "credential-like pattern matched" + }) + + +def classify_path(path, root, size, rel_path): + lower = str(path).lower() + name = path.name.lower() + + info = {"file": rel_path, "sha256": sha256_file(path), "size": size} + + if name in INTERESTING_NAMES: + add("interesting_files", info) + + if any(p in lower for p in ["/etc/init.d", "/etc/rc", "/etc/inittab", "/etc/services", "/lib/systemd"]): + add("startup_scripts", info) + + if any(p in lower for p in ["/www", "/web", "/htdocs", "/cgi-bin", "/var/www"]): + add("web_files", info) + + if any(x in name for x in ["config", ".conf", ".cfg", ".ini", ".json", ".xml", ".yaml", ".yml"]): + add("config_files", info) + + if "cron" in lower or "crontab" in lower: + add("cron_jobs", info) + + if name in {"passwd", "shadow", "group"}: + add("users_groups", info) + + if any(x in lower for x in ["ssh", "dropbear", "authorized_keys", "host_key"]): + add("ssh_related", info) + + if name in EXECUTABLE_NAMES: + add("interesting_binaries", {**info, "reason": "Common embedded Linux service or utility"}) + + +def scan_busybox(path, text, rel_path): + if "busybox" not in str(path).lower() and "busybox" not in text.lower(): + return + + banner = re.search(r"BusyBox v[\w.\-]+", text) + applet_hits = [] + applet_keywords = ["telnet", "wget", "tftp", "httpd", "ash", "sh", "nc", "ftpget", "ftpput"] + + for line in text.splitlines(): + low = line.lower() + if any(x in low for x in applet_keywords) and len(line.strip()) < 250: + applet_hits.append(line.strip()) + + risky_applets = [] + for app in applet_keywords: + if app in text.lower(): + risky_applets.append(app) + + add("busybox", { + "file": rel_path, + "banner": banner.group(0) if banner else None, + "risky_applet_hints": sorted(set(risky_applets)), + "possible_applets_or_strings": sorted(set(applet_hits))[:100] + }) + + +def scan_web_endpoints(path, content, rel_path): + lower_path = str(path).lower() + + if not any(x in lower_path for x in ["/www", "/web", "/htdocs", "/cgi-bin", ".html", ".js", ".php", ".cgi", ".asp"]): + return + + endpoints = re.findall(r"[\"'](/[^\"'\s<>]{2,})[\"']", content) + forms = re.findall(r"(?i)]+action=[\"']?([^\"'>\s]+)", content) + scripts = re.findall(r"(?i)]+src=[\"']?([^\"'>\s]+)", content) + ajax = re.findall(r"(?i)(?:url|href|src)\s*[:=]\s*[\"']([^\"']+)[\"']", content) + cgi = re.findall(r"[\w./-]+\.cgi(?:\?[^\"'\s<>]*)?", content) + + admin_hits = [] + for route in set(endpoints + forms + scripts + ajax + cgi): + if any(k in route.lower() for k in ["admin", "login", "password", "upgrade", "firmware", "reboot", "debug", "shell", "system", "config"]): + admin_hits.append(route) + + if endpoints or forms or scripts or ajax or cgi: + data = { + "file": rel_path, + "endpoints": sorted(set(endpoints))[:150], + "forms": sorted(set(forms))[:80], + "scripts": sorted(set(scripts))[:80], + "ajax_or_refs": sorted(set(ajax))[:120], + "cgi_refs": sorted(set(cgi))[:120], + "admin_like_routes": sorted(set(admin_hits))[:120] + } + add("web_endpoints", data) + + for route in sorted(set(endpoints + forms + scripts + ajax + cgi + admin_hits)): + if len(route) < 300: + add("web_routes", { + "route": route, + "file": rel_path, + "admin_like": route in admin_hits + }) + + +def scan_possible_cves(content, rel_path): + for cve in re.findall(r"CVE-\d{4}-\d{4,7}", content, re.I): + add("cve_references", {"file": rel_path, "cve": cve.upper()}) + + +def yara_scan_file(path, rel_path, yara_rule_path): + if not yara_rule_path: + return + + try: + result = subprocess.run( + ["yara", "-r", str(yara_rule_path), str(path)], + capture_output=True, + text=True, + timeout=25 + ) + + if result.stdout.strip(): + add("yara_matches", { + "file": rel_path, + "matches": result.stdout.strip().splitlines() + }) + + except FileNotFoundError: + add("tool_warnings", {"tool": "yara", "warning": "YARA is not installed or not in PATH"}) + except Exception as e: + add("scan_errors", {"file": rel_path, "error": f"YARA scan failed: {e}"}) + + +def scan_file(path, root, yara_rules=None, quick=False): + try: + if not path.is_file(): + return + + size = path.stat().st_size + rel_path = safe_rel(path, root) + + classify_path(path, root, size, rel_path) + + if quick and size > 10_000_000: + add("skipped_large_files", { + "file": rel_path, + "size": size, + "reason": "Skipped because --quick mode is enabled" + }) + return + + ent = entropy_file(path) + if ent is not None and ent >= 7.5 and size > 1024: + add("high_entropy_files", { + "file": rel_path, + "entropy": ent, + "size": size, + "sha256": sha256_file(path), + "note": "May indicate compression, encryption, packed data, or a binary blob" + }) + + if is_elf(path): + info = elf_info(path) + file_desc = run_file_cmd(path) + add("elf_binaries", { + "file": rel_path, + "sha256": sha256_file(path), + "size": size, + "elf": info, + "file_cmd": file_desc + }) + + strings_text = "\n".join(run_strings(path)) + scan_text_content(path, strings_text, rel_path) + scan_busybox(path, strings_text, rel_path) + scan_possible_cves(strings_text, rel_path) + + if is_text_file(path): + content = read_text(path) + scan_text_content(path, content, rel_path) + scan_web_endpoints(path, content, rel_path) + scan_possible_cves(content, rel_path) + scan_busybox(path, content, rel_path) + + if yara_rules: + yara_scan_file(path, rel_path, yara_rules) + + except Exception as e: + add("scan_errors", {"file": str(path), "error": str(e)}) + + +# ============================================================================= +# POST PROCESSING +# ============================================================================= + +def detect_firmware_identity(root): + identity_files = [ + "etc/os-release", "etc/openwrt_release", "etc/openwrt_version", + "etc/version", "etc/banner", "etc/issue", "version", "release" + ] + + for rel in identity_files: + path = Path(root) / rel + if path.exists() and path.is_file(): + text = read_text(path, max_chars=5000) + if text: + add("firmware_identity", { + "file": rel, + "content_preview": text[:1000] + }) + + # Heuristic identity from config/version strings + vendor_model_keywords = ["model", "vendor", "product", "device", "board", "firmware", "version"] + for item in REPORT.get("version_strings", [])[:200]: + versions = item.get("versions", []) + hits = [v for v in versions if any(k in v.lower() for k in vendor_model_keywords)] + if hits: + add("firmware_identity", { + "file": item.get("file"), + "identity_hints": hits[:20] + }) + + +def summarize_architecture(): + machines = Counter() + endians = Counter() + classes = Counter() + + for item in REPORT.get("elf_binaries", []): + elf = item.get("elf", {}) + if elf.get("machine"): + machines[elf["machine"]] += 1 + if elf.get("endian"): + endians[elf["endian"]] += 1 + if elf.get("class"): + classes[elf["class"]] += 1 + + if machines or endians or classes: + add("architecture_summary", { + "machines": dict(machines.most_common()), + "endian": dict(endians.most_common()), + "class": dict(classes.most_common()) + }) + + +def summarize_largest_files(root, limit=30): + files = [] + for dirpath, _, filenames in os.walk(root): + for filename in filenames: + p = Path(dirpath) / filename + try: + if p.is_file(): + files.append((p.stat().st_size, safe_rel(p, root), sha256_file(p))) + except Exception: + pass + + for size, rel, digest in sorted(files, reverse=True)[:limit]: + add("largest_files", {"file": rel, "size": size, "sha256": digest}) + + +def run_plugins(plugin_dir, root): + if not plugin_dir: + return + + plugin_dir = Path(plugin_dir) + if not plugin_dir.exists(): + add("tool_warnings", {"tool": "plugins", "warning": f"Plugin directory not found: {plugin_dir}"}) + return + + for plugin_file in plugin_dir.glob("*.py"): + try: + spec = importlib.util.spec_from_file_location(plugin_file.stem, plugin_file) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + if hasattr(module, "scan"): + result = module.scan(str(root)) + add("plugin_findings", { + "plugin": plugin_file.name, + "result": result + }) + else: + add("tool_warnings", { + "tool": "plugins", + "warning": f"{plugin_file.name} has no scan(root) function" + }) + except Exception as e: + add("scan_errors", { + "file": str(plugin_file), + "error": f"Plugin failed: {e}" + }) + + +def score_findings(): + weights = { + "private_keys": 35, + "credential_findings": 20, + "possible_password": 15, + "wifi_psk": 15, + "api_key": 15, + "jwt": 15, + "users_groups": 12, + "yara_matches": 35, + "malware_iocs": 30, + "interesting_binaries": 8, + "suspicious_keywords": 6, + "web_routes": 5, + "web_endpoints": 5, + "ssh_related": 5, + "startup_scripts": 5, + "cve_references": 5, + "cve_hints": 3, + } + + score = 0 + reasons = [] + + for category, weight in weights.items(): + count = len(REPORT.get(category, [])) + if count: + added = min(count * weight, weight * 6) + score += added + reasons.append(f"{category}: {count} finding(s), +{added}") + + critical = any(x.get("severity") == "critical" for x in REPORT.get("credential_findings", []) if isinstance(x, dict)) + malware = len(REPORT.get("malware_iocs", [])) > 0 + + if score >= 180 or critical or malware: + level = "CRITICAL" if critical or malware else "HIGH" + elif score >= 100: + level = "HIGH" + elif score >= 50: + level = "MEDIUM" + elif score > 0: + level = "LOW" + else: + level = "NONE" + + REPORT["risk_summary"] = [{ + "score": score, + "level": level, + "reasons": reasons, + "notes": [ + "Risk score is a triage aid, not proof of compromise.", + "Manually validate credentials, CVEs, and malware indicators." + ] + }] + + +# ============================================================================= +# REPORTING +# ============================================================================= + +def write_list_file(output_dir, filename, title, items): + path = output_dir / filename + + with open(path, "w", encoding="utf-8") as f: + f.write(title + "\n") + f.write("=" * 100 + "\n\n") + + if not items: + f.write("No findings.\n") + return path + + for item in items: + if isinstance(item, dict): + for k, v in item.items(): + f.write(f"{k}: {v}\n") + f.write("-" * 100 + "\n") + else: + f.write(str(item) + "\n") + + return path + + +def write_csv(output_dir): + csv_file = output_dir / "findings.csv" + rows = [] + + for title, key, _filename in IMPORTANT_SECTIONS: + for item in REPORT.get(key, []): + if isinstance(item, dict): + rows.append({ + "category": key, + "title": title, + "file": item.get("file", ""), + "value": item.get("value", item.get("route", item.get("component", ""))), + "severity": item.get("severity", ""), + "confidence": item.get("confidence", ""), + "details": json.dumps(item, ensure_ascii=False) + }) + else: + rows.append({ + "category": key, + "title": title, + "file": "", + "value": str(item), + "severity": "", + "confidence": "", + "details": str(item) + }) + + with open(csv_file, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=["category", "title", "file", "value", "severity", "confidence", "details"]) + writer.writeheader() + writer.writerows(rows) + + return csv_file + + +def write_markdown(output_dir): + md_file = output_dir / "firmware_report.md" + risk = REPORT.get("risk_summary", [{}])[0] + totals = REPORT.get("scan_totals", [{}])[0] + meta = REPORT.get("scan_metadata", [{}])[0] + + with open(md_file, "w", encoding="utf-8") as f: + f.write(f"# Firmware Hunter Pro v{VERSION} Report\n\n") + f.write("## Scan Info\n\n") + f.write(f"- Firmware root: `{meta.get('root', 'Unknown')}`\n") + f.write(f"- Started: `{meta.get('started', 'Unknown')}`\n") + f.write(f"- Finished: `{meta.get('finished', 'Unknown')}`\n") + f.write(f"- Total files: `{totals.get('total_files', 0)}`\n") + f.write(f"- Total size: `{totals.get('total_size_bytes', 0)}` bytes\n") + f.write(f"- Quick mode: `{meta.get('quick_mode', False)}`\n\n") + + f.write("## Risk Summary\n\n") + f.write(f"- Level: **{risk.get('level', 'UNKNOWN')}**\n") + f.write(f"- Score: **{risk.get('score', 0)}**\n\n") + + f.write("## Findings Overview\n\n") + f.write("| Finding | Count |\n|---|---:|\n") + for title, key, _ in IMPORTANT_SECTIONS: + count = len(REPORT.get(key, [])) + if count: + f.write(f"| {title} | {count} |\n") + + f.write("\n## Recommended Next Steps\n\n") + for step in recommended_steps(): + f.write(f"- {step}\n") + + return md_file + + +def write_html_report(output_dir): + html_file = output_dir / "firmware_report.html" + risk = REPORT.get("risk_summary", [{}])[0] + totals = REPORT.get("scan_totals", [{}])[0] + meta = REPORT.get("scan_metadata", [{}])[0] + + severity_color = { + "CRITICAL": "#ef4444", + "HIGH": "#f97316", + "MEDIUM": "#eab308", + "LOW": "#22c55e", + "NONE": "#94a3b8" + }.get(risk.get("level", "UNKNOWN"), "#94a3b8") + + with open(html_file, "w", encoding="utf-8") as f: + f.write(f""" + + + +Firmware Hunter Pro Report + + + + +
+

Firmware Hunter Pro v{VERSION}

+

Offline firmware triage report

+
+
+
+
Risk{html.escape(str(risk.get('level', 'UNKNOWN')))}
+
Risk Score{html.escape(str(risk.get('score', 0)))}
+
Total Files{html.escape(str(totals.get('total_files', 0)))}
+
Total Size{html.escape(str(totals.get('total_size_bytes', 0)))} bytes
+
+ +
+

Scan Info

+

Firmware Root: {html.escape(str(meta.get('root', 'Unknown')))}

+

Started: {html.escape(str(meta.get('started', 'Unknown')))}

+

Finished: {html.escape(str(meta.get('finished', 'Unknown')))}

+

Quick Mode: {html.escape(str(meta.get('quick_mode', False)))}

+

Worker Jobs: {html.escape(str(meta.get('jobs', 'Unknown')))}

+
+ +
+

Findings Overview

+""") + + for title, key, filename in IMPORTANT_SECTIONS: + count = len(REPORT.get(key, [])) + if count: + f.write(f"

{html.escape(title)}: {count} — {html.escape(filename)}

\n") + + f.write("""
+
+

Recommended Next Steps

+
    +""") + for step in recommended_steps(): + f.write(f"
  • {html.escape(step)}
  • \n") + + f.write("""
+
+ +

Detailed Findings

+ +""") + + for title, key, _filename in IMPORTANT_SECTIONS: + items = REPORT.get(key, []) + if not items: + continue + f.write(f"
{html.escape(title)} ({len(items)})\n") + f.write("
")
+            f.write(html.escape(json.dumps(items, indent=2, ensure_ascii=False)))
+            f.write("
\n") + + f.write("
") + + return html_file + + +def recommended_steps(): + return [ + "Review credential_findings.txt, possible_passwords.txt, and wifi_psks.txt for hardcoded credentials.", + "Review startup_scripts.txt to understand what starts at boot.", + "Review web_routes.txt and web_endpoints.txt for login panels, CGI handlers, firmware update routes, and debug routes.", + "Review interesting_binaries.txt for telnet, dropbear, busybox, wget, tftp, netcat, and web servers.", + "Review private_keys.txt and certificates.txt for exposed secrets or reused keys.", + "Review components.txt and cve_hints.txt, then verify versions against authoritative vulnerability databases.", + "Review malware_iocs.txt and yara_matches.txt, then manually validate before making conclusions.", + "Use full_report.json or findings.csv for automation, diffing, or importing into other tools." + ] + + +def write_reports(base_output): + timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + output_dir = Path(base_output) / f"scan_{timestamp}" + output_dir.mkdir(parents=True, exist_ok=True) + + json_file = output_dir / "full_report.json" + main_report = output_dir / "firmware_report.txt" + summary_file = output_dir / "summary.txt" + + normal_report = {k: v for k, v in REPORT.items()} + + with open(json_file, "w", encoding="utf-8") as f: + json.dump(normal_report, f, indent=4, ensure_ascii=False) + + risk = REPORT.get("risk_summary", [{}])[0] + totals = REPORT.get("scan_totals", [{}])[0] + meta = REPORT.get("scan_metadata", [{}])[0] + + for title, key, filename in IMPORTANT_SECTIONS: + items = REPORT.get(key, []) + if items: + write_list_file(output_dir, filename, title, items) + + csv_file = write_csv(output_dir) + md_file = write_markdown(output_dir) + html_file = write_html_report(output_dir) + + with open(summary_file, "w", encoding="utf-8") as f: + f.write("Firmware Hunter Pro Summary\n") + f.write("=" * 100 + "\n\n") + f.write(f"Scan Time: {timestamp}\n") + f.write(f"Firmware Root: {meta.get('root', 'Unknown')}\n") + f.write(f"Quick Mode: {meta.get('quick_mode', False)}\n") + f.write(f"Jobs: {meta.get('jobs', 'Unknown')}\n") + f.write(f"Total Files: {totals.get('total_files', 0)}\n") + f.write(f"Total Size: {totals.get('total_size_bytes', 0)} bytes\n\n") + + f.write("Risk Summary\n") + f.write("-" * 100 + "\n") + f.write(f"Risk Level: {risk.get('level', 'UNKNOWN')}\n") + f.write(f"Risk Score: {risk.get('score', 0)}\n\n") + + f.write("Finding Counts\n") + f.write("-" * 100 + "\n") + for title, key, _ in IMPORTANT_SECTIONS: + count = len(REPORT.get(key, [])) + if count: + f.write(f"{title:<35} {count}\n") + + with open(main_report, "w", encoding="utf-8") as f: + f.write("Firmware Hunter Pro Report\n") + f.write("=" * 100 + "\n") + f.write("Offline firmware triage report\n") + f.write("=" * 100 + "\n\n") + + f.write("[SCAN INFO]\n") + f.write(f"Firmware Root : {meta.get('root', 'Unknown')}\n") + f.write(f"Started : {meta.get('started', 'Unknown')}\n") + f.write(f"Finished : {meta.get('finished', 'Unknown')}\n") + f.write(f"Quick Mode : {meta.get('quick_mode', False)}\n") + f.write(f"Worker Jobs : {meta.get('jobs', 'Unknown')}\n") + f.write(f"Total Files : {totals.get('total_files', 0)}\n") + f.write(f"Total Size : {totals.get('total_size_bytes', 0)} bytes\n\n") + + f.write("[RISK SUMMARY]\n") + f.write(f"Level : {risk.get('level', 'UNKNOWN')}\n") + f.write(f"Score : {risk.get('score', 0)}\n\n") + + if risk.get("reasons"): + f.write("Reasons:\n") + for reason in risk.get("reasons", []): + f.write(f" - {reason}\n") + f.write("\n") + + f.write("[FINDINGS OVERVIEW]\n") + for title, key, filename in IMPORTANT_SECTIONS: + count = len(REPORT.get(key, [])) + if count: + f.write(f"[+] {title:<35} {count:<5} -> {filename}\n") + else: + f.write(f"[-] {title:<35} 0\n") + + f.write("\n[RECOMMENDED NEXT STEPS]\n") + f.write("-" * 100 + "\n") + for idx, step in enumerate(recommended_steps(), 1): + f.write(f"{idx}. {step}\n") + + f.write("\n[GENERATED FILES]\n") + for file in sorted(output_dir.iterdir()): + f.write(f"- {file.name}\n") + + print() + print("=" * 100) + print(" Firmware Hunter Pro Scan Complete") + print("=" * 100) + print(f" Output Folder : {output_dir}") + print(f" Main Report : {main_report}") + print(f" Summary : {summary_file}") + print(f" HTML Report : {html_file}") + print(f" Markdown : {md_file}") + print(f" CSV Findings : {csv_file}") + print(f" JSON Report : {json_file}") + print("=" * 100) + + +# ============================================================================= +# MAIN SCAN +# ============================================================================= + +def scan_firmware(root, yara_rules=None, quick=False, jobs=8, plugins=None): + root = Path(root).resolve() + + if not root.exists(): + raise FileNotFoundError(f"Firmware path does not exist: {root}") + + if not root.is_dir(): + raise NotADirectoryError(f"Firmware path should be an extracted directory/rootfs: {root}") + + files = [] + total_size = 0 + + for dirpath, _, filenames in os.walk(root): + for filename in filenames: + path = Path(dirpath) / filename + if path.is_file(): + files.append(path) + try: + total_size += path.stat().st_size + except Exception: + pass + + REPORT["scan_metadata"] = [{ + "tool": "Firmware Hunter Pro", + "version": VERSION, + "root": str(root), + "started": now_iso(), + "quick_mode": quick, + "jobs": jobs, + "yara_rules": str(yara_rules) if yara_rules else None, + "plugins": str(plugins) if plugins else None + }] + + print(f"[+] Files discovered: {len(files)}") + print(f"[+] Total size: {total_size} bytes") + print(f"[+] Worker threads: {jobs}") + + completed = 0 + start_time = time.time() + + with ThreadPoolExecutor(max_workers=max(1, jobs)) as executor: + futures = [executor.submit(scan_file, f, root, yara_rules, quick) for f in files] + + for future in as_completed(futures): + completed += 1 + try: + future.result() + except Exception as e: + add("scan_errors", {"file": "unknown", "error": str(e)}) + + if completed % 500 == 0: + elapsed = max(time.time() - start_time, 1) + rate = completed / elapsed + print(f"[+] Progress: {completed}/{len(files)} files scanned ({rate:.1f} files/sec)") + + detect_firmware_identity(root) + summarize_architecture() + summarize_largest_files(root) + run_plugins(plugins, root) + score_findings() + + REPORT["scan_totals"] = [{ + "total_files": len(files), + "total_size_bytes": total_size, + "elapsed_seconds": round(time.time() - start_time, 2) + }] + + REPORT["scan_metadata"][0]["finished"] = now_iso() + + +def main(): + parser = argparse.ArgumentParser( + description="Firmware Hunter Pro v4.0 - advanced offline firmware triage scanner" + ) + + parser.add_argument("target", help="Path to extracted firmware/rootfs directory, or firmware image with --extract") + parser.add_argument("-o", "--output", default="firmware_hunter_output", help="Base output directory") + parser.add_argument("--extract", action="store_true", help="Use binwalk to extract target first, then scan extraction") + parser.add_argument("--yara", help="Optional path to YARA rule file or directory") + parser.add_argument("--quick", action="store_true", help="Quick mode: skip files larger than 10 MB") + parser.add_argument("-j", "--jobs", type=int, default=8, help="Number of worker threads, default: 8") + parser.add_argument("--plugins", help="Optional plugins directory. Each plugin should expose scan(root).") + + args = parser.parse_args() + + print(f"[+] Firmware Hunter Pro v{VERSION}") + print("[+] Safe mode: this tool does not execute firmware binaries.") + + check_dependencies(yara_requested=bool(args.yara), extract_requested=args.extract) + + target = Path(args.target).resolve() + + if args.extract: + scan_root = extract_with_binwalk(target, args.output) + else: + scan_root = target + + print(f"[+] Scanning: {scan_root}") + + scan_firmware( + scan_root, + yara_rules=args.yara, + quick=args.quick, + jobs=args.jobs, + plugins=args.plugins + ) + + write_reports(args.output) + + +if __name__ == "__main__": + main()