Firmware_Hunter_Pro/firmware_hunter_pro_v4.py
2026-05-20 17:37:38 -07:00

1388 lines
48 KiB
Python

#!/usr/bin/env python3
"""
Firmware Hunter Pro v4.0
A safer offline firmware triage framework for extracted router / IoT firmware.
What it does:
- Scans extracted firmware directories safely without executing firmware binaries.
- Optionally extracts a firmware image with binwalk if binwalk is installed.
- Finds credentials, keys, certs, IPs, domains, URLs, MACs, JWTs, API keys, hashes, CVEs.
- Maps web UI files, CGI handlers, forms, scripts, endpoints, and admin-looking routes.
- Detects ELF binaries, architecture hints, BusyBox, Linux kernel, OpenSSL, Dropbear, dnsmasq, uClibc/musl/glibc.
- Scores findings with severity and confidence.
- Produces TXT, JSON, HTML, Markdown, CSV, and separate evidence files.
- Supports simple external plugins from a plugins directory.
Safe by default:
- Does not execute firmware binaries.
- Only reads files and optionally runs host tools like strings, file, binwalk, yara.
Usage:
python3 firmware_hunter_pro.py /path/to/squashfs-root
python3 firmware_hunter_pro.py firmware.bin --extract
python3 firmware_hunter_pro.py /path/to/rootfs --quick
python3 firmware_hunter_pro.py /path/to/rootfs --yara rules.yar
python3 firmware_hunter_pro.py /path/to/rootfs --plugins plugins/
"""
import os
import re
import csv
import sys
import json
import math
import time
import html
import shutil
import hashlib
import argparse
import tempfile
import subprocess
import importlib.util
from pathlib import Path
from datetime import datetime
from collections import Counter, defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
VERSION = "4.0"
REPORT = defaultdict(list)
REPORT_LOCK = Lock()
# =============================================================================
# CONFIG
# =============================================================================
TEXT_EXTENSIONS = {
".txt", ".log", ".conf", ".cfg", ".ini", ".json", ".xml", ".html", ".htm",
".js", ".php", ".asp", ".cgi", ".lua", ".sh", ".service", ".default",
".profile", ".passwd", ".shadow", ".pem", ".key", ".crt", ".pub", ".yaml",
".yml", ".env", ".properties", ".rc", ".rules"
}
INTERESTING_NAMES = {
"passwd", "shadow", "group", "hosts", "resolv.conf", "inittab", "fstab",
"rcS", "rc.local", "profile", "authorized_keys", "known_hosts", "motd",
"issue", "services", "inetd", "inetd.conf", "udhcpd.conf", "dnsmasq.conf",
"dropbear", "telnetd", "lighttpd.conf", "boa.conf", "httpd.conf",
"mini_httpd.conf", "uhttpd.conf", "nginx.conf", "config.xml", "nvram",
"default.cfg", "wpa_supplicant.conf", "os-release", "version", "release",
"syslog.conf", "crontab"
}
EXECUTABLE_NAMES = {
"busybox", "telnetd", "dropbear", "sshd", "httpd", "boa", "lighttpd",
"mini_httpd", "uhttpd", "nginx", "nc", "netcat", "wget", "curl", "tftp",
"ftpget", "ftpput", "iptables", "ip6tables", "dnsmasq", "udhcpd", "pppd",
"openvpn", "openssl", "sqlite3", "ash", "sh"
}
SUSPICIOUS_KEYWORDS = {
"telnetd", "dropbear", "busybox telnet", "nc -l", "netcat", "/bin/sh",
"/bin/ash", "reverse shell", "backdoor", "wget http", "curl http", "tftp",
"ftpget", "ftpput", "chmod 777", "0.0.0.0", "admin:admin", "root:root",
"password", "passwd", "shadow", "debug", "test", "factory", "developer",
"enable telnet", "remote shell", "hardcoded", "debug shell", "diagnostic",
"support account", "superuser", "hidden", "maintenance"
}
MALWARE_IOC_KEYWORDS = {
"mirai": ["mirai", "busybox MIRAI", "/bin/busybox", "report.%s", "scanListen"],
"gafgyt/bashlite": ["gafgyt", "bashlite", "gayfgt", "loligang", "telnet scanner"],
"mozi": ["mozi", "Mozi.m", "dht.transmissionbt.com", "router.bittorrent.com"],
"xorddos": ["xorddos", "x0r", "/tmp/.x", "BB2FA36AAA9541F0"],
"miner": ["stratum+tcp", "xmrig", "minerd", "cryptonight", "monero"],
"generic_bot": ["CNC", "C2", "botnet", "udp flood", "syn flood", "http flood"]
}
REGEX_PATTERNS = {
"ipv4": r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
"mac_address": r"\b(?:[0-9a-fA-F]{2}[:-]){5}[0-9a-fA-F]{2}\b",
"url": r"https?://[^\s'\"<>]{4,}",
"email": r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+",
"domain": r"\b(?:[a-zA-Z0-9-]+\.)+(?:com|net|org|io|cn|ru|info|biz|us|uk|co|dev|cloud|local|lan)\b",
"md5": r"\b[a-fA-F0-9]{32}\b",
"sha1": r"\b[a-fA-F0-9]{40}\b",
"sha256": r"\b[a-fA-F0-9]{64}\b",
"jwt": r"eyJ[a-zA-Z0-9_\-]+\.[a-zA-Z0-9_\-]+\.[a-zA-Z0-9_\-]+",
"api_key": r"(?i)(api[_-]?key|apikey|access[_-]?key|secret[_-]?key|client_secret)\s*[:=]\s*['\"]?([a-zA-Z0-9_\-]{12,})",
"wifi_psk": r"(?i)(passphrase|psk|wpa_pass|wifi_pass|wifi_password|wireless_key|wpakey)\s*[:=]\s*['\"]?([^\s'\";&]{8,63})",
"possible_password": r"(?i)(password|passwd|pwd|pass|admin_pass|root_pass|web_pass|wifi_pass|secret|token)\s*[:=]\s*['\"]?([^\s'\";&]{4,})",
"possible_username": r"(?i)(username|user|login|admin_user|root_user)\s*[:=]\s*['\"]?([^\s'\";&]{3,})",
"mqtt_credential": r"(?i)(mqtt_(?:user|username|pass|password)|mqttUser|mqttPass)\s*[:=]\s*['\"]?([^\s'\";&]{3,})",
"basic_auth": r"(?i)authorization:\s*basic\s+([a-zA-Z0-9+/=]{8,})",
}
PRIVATE_KEY_MARKERS = {
"-----BEGIN RSA PRIVATE KEY-----",
"-----BEGIN DSA PRIVATE KEY-----",
"-----BEGIN EC PRIVATE KEY-----",
"-----BEGIN OPENSSH PRIVATE KEY-----",
"-----BEGIN PRIVATE KEY-----"
}
CERT_MARKERS = {"-----BEGIN CERTIFICATE-----"}
COMPONENT_PATTERNS = {
"busybox": [
r"BusyBox v([\w.\-]+)",
r"busybox\s+v([\w.\-]+)"
],
"linux_kernel": [
r"Linux version ([\w.\-]+)",
r"kernel version[:= ]+([\w.\-]+)"
],
"openssl": [
r"OpenSSL\s+([0-9][\w.\-]+[a-z]?)",
r"openssl-([0-9][\w.\-]+)"
],
"dropbear": [
r"Dropbear sshd v?([0-9][\w.\-]+)",
r"dropbear[_ -]?([0-9]{4}\.[0-9]{2})"
],
"dnsmasq": [
r"dnsmasq-?([0-9][\w.\-]+)",
r"dnsmasq version ([0-9][\w.\-]+)"
],
"uClibc": [
r"uClibc-?([0-9][\w.\-]+)"
],
"musl": [
r"musl-?([0-9][\w.\-]+)"
],
"glibc": [
r"GNU C Library.*?release version ([0-9][\w.\-]+)",
r"GLIBC_([0-9.]+)"
],
"lighttpd": [
r"lighttpd/([0-9][\w.\-]+)"
],
"boa": [
r"Boa/([0-9][\w.\-]+)"
],
"uhttpd": [
r"uhttpd[-/ ]([0-9][\w.\-]+)"
]
}
CVE_HINTS = {
"busybox": {
"note": "BusyBox is frequently old in embedded firmware. Verify exact version against NVD/vendor advisories.",
"keywords": ["busybox", "ash", "udhcp", "telnetd"]
},
"openssl": {
"note": "OpenSSL version detected. Check for old TLS/crypto CVEs and weak certificate/key usage.",
"keywords": ["openssl", "libssl", "libcrypto"]
},
"dropbear": {
"note": "Dropbear SSH detected. Check version against Dropbear security advisories.",
"keywords": ["dropbear", "sshd"]
},
"dnsmasq": {
"note": "dnsmasq detected. Check version for DNS/DHCP vulnerabilities.",
"keywords": ["dnsmasq"]
},
"boa": {
"note": "Boa web server is commonly outdated in IoT firmware.",
"keywords": ["boa"]
}
}
IMPORTANT_SECTIONS = [
("Credential Findings", "credential_findings", "credential_findings.txt"),
("Possible Passwords", "possible_password", "possible_passwords.txt"),
("Wi-Fi PSKs", "wifi_psk", "wifi_psks.txt"),
("Possible Usernames", "possible_username", "possible_usernames.txt"),
("API Keys / Secrets", "api_key", "api_keys.txt"),
("MQTT Credentials", "mqtt_credential", "mqtt_credentials.txt"),
("JWT Tokens", "jwt", "jwt_tokens.txt"),
("Private Keys", "private_keys", "private_keys.txt"),
("Certificates", "certificates", "certificates.txt"),
("IP Addresses", "ipv4", "ip_addresses.txt"),
("MAC Addresses", "mac_address", "mac_addresses.txt"),
("URLs", "url", "urls.txt"),
("Domains", "domain", "domains.txt"),
("Emails", "email", "emails.txt"),
("Component Versions", "components", "components.txt"),
("Version Strings", "version_strings", "version_strings.txt"),
("Firmware Identity", "firmware_identity", "firmware_identity.txt"),
("Startup Scripts", "startup_scripts", "startup_scripts.txt"),
("Interesting Files", "interesting_files", "interesting_files.txt"),
("Interesting Binaries", "interesting_binaries", "interesting_binaries.txt"),
("ELF Binaries", "elf_binaries", "elf_binaries.txt"),
("Architecture Summary", "architecture_summary", "architecture_summary.txt"),
("Web Files", "web_files", "web_files.txt"),
("Web Routes", "web_routes", "web_routes.txt"),
("Web Endpoints", "web_endpoints", "web_endpoints.txt"),
("Cron Jobs", "cron_jobs", "cron_jobs.txt"),
("Users / Groups", "users_groups", "users_groups.txt"),
("SSH Related Files", "ssh_related", "ssh_related.txt"),
("Suspicious Keywords", "suspicious_keywords", "suspicious_keywords.txt"),
("Malware IOC Matches", "malware_iocs", "malware_iocs.txt"),
("High Entropy Files", "high_entropy_files", "high_entropy_files.txt"),
("Largest Files", "largest_files", "largest_files.txt"),
("BusyBox Findings", "busybox", "busybox.txt"),
("CVE References", "cve_references", "cve_references.txt"),
("CVE Hints", "cve_hints", "cve_hints.txt"),
("YARA Matches", "yara_matches", "yara_matches.txt"),
("Plugin Findings", "plugin_findings", "plugin_findings.txt"),
("Skipped Large Files", "skipped_large_files", "skipped_large_files.txt"),
("Tool Warnings", "tool_warnings", "tool_warnings.txt"),
("Scan Errors", "scan_errors", "scan_errors.txt"),
]
# =============================================================================
# BASIC HELPERS
# =============================================================================
def add(category, data):
"""Thread-safe de-duplicated report append."""
with REPORT_LOCK:
if data not in REPORT[category]:
REPORT[category].append(data)
def safe_rel(path, root):
try:
return str(Path(path).resolve().relative_to(Path(root).resolve()))
except Exception:
return str(path)
def now_iso():
return datetime.now().isoformat(timespec="seconds")
def check_dependencies(yara_requested=False, extract_requested=False):
dependencies = ["strings"]
if yara_requested:
dependencies.append("yara")
if extract_requested:
dependencies.append("binwalk")
for tool in dependencies:
if shutil.which(tool) is None:
add("tool_warnings", {
"tool": tool,
"warning": f"'{tool}' was not found in PATH. Some features may be limited."
})
print(f"[!] Warning: '{tool}' not found. Some features may be limited.")
def sha256_file(path):
try:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
except Exception:
return None
def entropy_file(path, max_bytes=1024 * 1024):
try:
with open(path, "rb") as f:
data = f.read(max_bytes)
if not data:
return 0.0
counts = Counter(data)
length = len(data)
entropy = -sum((count / length) * math.log2(count / length) for count in counts.values())
return round(entropy, 4)
except Exception:
return None
def is_text_file(path):
if path.suffix.lower() in TEXT_EXTENSIONS:
return True
try:
with open(path, "rb") as f:
chunk = f.read(4096)
if not chunk:
return False
return chunk.count(b"\x00") / len(chunk) < 0.08
except Exception:
return False
def read_text(path, max_chars=None):
try:
text = path.read_text(errors="ignore")
if max_chars:
return text[:max_chars]
return text
except Exception:
return ""
def is_elf(path):
try:
with open(path, "rb") as f:
return f.read(4) == b"\x7fELF"
except Exception:
return False
def elf_info(path):
try:
with open(path, "rb") as f:
header = f.read(20)
if len(header) < 20:
return {}
elf_class = header[4]
endian = header[5]
byte_order = "little" if endian == 1 else "big" if endian == 2 else "little"
machine = int.from_bytes(header[18:20], byte_order)
arch_map = {
3: "x86",
8: "MIPS",
20: "PowerPC",
40: "ARM",
62: "x86_64",
183: "AArch64",
243: "RISC-V"
}
return {
"class": "32-bit" if elf_class == 1 else "64-bit" if elf_class == 2 else "unknown",
"endian": "little" if endian == 1 else "big" if endian == 2 else "unknown",
"machine": arch_map.get(machine, f"unknown-{machine}")
}
except Exception:
return {}
def run_strings(path, limit=1200):
try:
result = subprocess.run(
["strings", str(path)],
capture_output=True,
text=True,
timeout=20,
errors="ignore"
)
return result.stdout.splitlines()[:limit]
except Exception:
return []
def run_file_cmd(path):
if shutil.which("file") is None:
return None
try:
result = subprocess.run(
["file", "-b", str(path)],
capture_output=True,
text=True,
timeout=5,
errors="ignore"
)
return result.stdout.strip()
except Exception:
return None
# =============================================================================
# EXTRACTION
# =============================================================================
def extract_with_binwalk(firmware_image, output_base):
firmware_image = Path(firmware_image).resolve()
extract_dir = Path(output_base).resolve() / f"binwalk_extract_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
extract_dir.mkdir(parents=True, exist_ok=True)
if shutil.which("binwalk") is None:
raise RuntimeError("binwalk not found. Install it or scan an already extracted rootfs directory.")
print(f"[+] Extracting with binwalk into: {extract_dir}")
try:
subprocess.run(
["binwalk", "-Me", str(firmware_image), "--directory", str(extract_dir)],
check=False,
timeout=1800
)
except subprocess.TimeoutExpired:
add("tool_warnings", {"tool": "binwalk", "warning": "binwalk extraction timed out"})
except Exception as e:
add("tool_warnings", {"tool": "binwalk", "warning": f"binwalk extraction failed: {e}"})
candidates = find_rootfs_candidates(extract_dir)
if candidates:
print("[+] Possible rootfs candidates:")
for idx, cand in enumerate(candidates[:10], 1):
print(f" {idx}. {cand}")
return candidates[0]
print("[!] No obvious rootfs found. Scanning extraction directory instead.")
return extract_dir
def find_rootfs_candidates(base_dir):
base_dir = Path(base_dir)
candidates = []
for dirpath, dirnames, filenames in os.walk(base_dir):
d = Path(dirpath)
names = set(filenames) | set(dirnames)
score = 0
if "etc" in names:
score += 3
if "bin" in names:
score += 2
if "sbin" in names:
score += 2
if "www" in names or "web" in names or "htdocs" in names:
score += 2
if "passwd" in names or "shadow" in names:
score += 2
if "init.d" in names:
score += 2
if score >= 5:
candidates.append((score, d))
candidates.sort(key=lambda x: x[0], reverse=True)
return [c[1] for c in candidates]
# =============================================================================
# SCANNERS
# =============================================================================
def extract_version_strings(text):
patterns = [
r"(?i)(version|build|release|fw_ver|firmware|software|sw_ver|hardware|hw_ver|model|vendor|product)\s*[:=]\s*['\"]?([\w.\-/ ]{3,80})",
r"(?i)(busybox\s+v\d{1,3}\.\d{1,3}(?:\.\d{1,5})?)",
r"(?i)(linux\s+version\s+[\w.\-]+)",
r"\bV(\d{1,3}\.\d{1,3}\.\d{1,5})\b",
r"\b(\d{4}-\d{2}-\d{2})\b",
]
versions = set()
for pattern in patterns:
for match in re.findall(pattern, text):
if isinstance(match, tuple):
value = match[-1]
else:
value = match
value = " ".join(value.strip().split())
if 3 <= len(value) <= 120:
versions.add(value)
return sorted(versions)[:40]
def detect_components(text, rel_path):
for component, patterns in COMPONENT_PATTERNS.items():
for pattern in patterns:
for match in re.findall(pattern, text, re.I):
version = match[-1] if isinstance(match, tuple) else match
version = str(version).strip()
if version:
add("components", {
"component": component,
"version": version,
"file": rel_path
})
if component in CVE_HINTS:
add("cve_hints", {
"component": component,
"version": version,
"file": rel_path,
"note": CVE_HINTS[component]["note"]
})
def detect_malware_iocs(text, rel_path):
low = text.lower()
for family, indicators in MALWARE_IOC_KEYWORDS.items():
hits = []
for indicator in indicators:
if indicator.lower() in low:
hits.append(indicator)
if hits:
add("malware_iocs", {
"family_or_category": family,
"file": rel_path,
"indicators": sorted(set(hits)),
"confidence": "medium" if len(hits) == 1 else "high"
})
def scan_text_content(path, content, rel_path):
for name, pattern in REGEX_PATTERNS.items():
for match in re.findall(pattern, content):
if isinstance(match, tuple):
finding = {"file": rel_path, "key": match[0], "value": match[1]}
value_for_validation = match[1]
else:
finding = {"file": rel_path, "value": match}
value_for_validation = match
if name == "ipv4":
parts = value_for_validation.split(".")
if not all(p.isdigit() and 0 <= int(p) <= 255 for p in parts):
continue
add(name, finding)
if name in {"possible_password", "wifi_psk", "api_key", "mqtt_credential"}:
add_credential_finding(name, finding)
for marker in PRIVATE_KEY_MARKERS:
if marker in content:
add("private_keys", {"file": rel_path, "marker": marker})
add("credential_findings", {
"file": rel_path,
"type": "private_key",
"value": marker,
"confidence": "high",
"severity": "critical",
"reason": "Private key material marker found"
})
for marker in CERT_MARKERS:
if marker in content:
add("certificates", {"file": rel_path, "marker": marker})
lowered = content.lower()
for keyword in SUSPICIOUS_KEYWORDS:
if keyword.lower() in lowered:
add("suspicious_keywords", {"file": rel_path, "keyword": keyword})
versions = extract_version_strings(content)
if versions:
add("version_strings", {"file": rel_path, "versions": versions})
detect_components(content, rel_path)
detect_malware_iocs(content, rel_path)
def add_credential_finding(kind, finding):
value = str(finding.get("value", ""))
key = str(finding.get("key", ""))
file = finding.get("file", "")
confidence = "medium"
severity = "medium"
reasons = []
if kind in {"api_key", "wifi_psk"}:
confidence = "high"
severity = "high"
reasons.append(f"{kind} pattern matched")
if key.lower() in {"password", "passwd", "pwd", "admin_pass", "root_pass", "secret", "token"}:
confidence = "high"
severity = "high"
reasons.append("credential-like key name")
if value.lower() in {"admin", "root", "password", "123456", "12345678", "admin123", "root123"}:
confidence = "high"
severity = "high"
reasons.append("common/default credential value")
if any(x in file.lower() for x in ["shadow", "passwd", "default", "config", "nvram", "wpa"]):
reasons.append("sensitive config path")
if confidence == "medium":
confidence = "high"
add("credential_findings", {
"file": file,
"type": kind,
"key": key,
"value": value,
"confidence": confidence,
"severity": severity,
"reason": "; ".join(reasons) if reasons else "credential-like pattern matched"
})
def classify_path(path, root, size, rel_path):
lower = str(path).lower()
name = path.name.lower()
info = {"file": rel_path, "sha256": sha256_file(path), "size": size}
if name in INTERESTING_NAMES:
add("interesting_files", info)
if any(p in lower for p in ["/etc/init.d", "/etc/rc", "/etc/inittab", "/etc/services", "/lib/systemd"]):
add("startup_scripts", info)
if any(p in lower for p in ["/www", "/web", "/htdocs", "/cgi-bin", "/var/www"]):
add("web_files", info)
if any(x in name for x in ["config", ".conf", ".cfg", ".ini", ".json", ".xml", ".yaml", ".yml"]):
add("config_files", info)
if "cron" in lower or "crontab" in lower:
add("cron_jobs", info)
if name in {"passwd", "shadow", "group"}:
add("users_groups", info)
if any(x in lower for x in ["ssh", "dropbear", "authorized_keys", "host_key"]):
add("ssh_related", info)
if name in EXECUTABLE_NAMES:
add("interesting_binaries", {**info, "reason": "Common embedded Linux service or utility"})
def scan_busybox(path, text, rel_path):
if "busybox" not in str(path).lower() and "busybox" not in text.lower():
return
banner = re.search(r"BusyBox v[\w.\-]+", text)
applet_hits = []
applet_keywords = ["telnet", "wget", "tftp", "httpd", "ash", "sh", "nc", "ftpget", "ftpput"]
for line in text.splitlines():
low = line.lower()
if any(x in low for x in applet_keywords) and len(line.strip()) < 250:
applet_hits.append(line.strip())
risky_applets = []
for app in applet_keywords:
if app in text.lower():
risky_applets.append(app)
add("busybox", {
"file": rel_path,
"banner": banner.group(0) if banner else None,
"risky_applet_hints": sorted(set(risky_applets)),
"possible_applets_or_strings": sorted(set(applet_hits))[:100]
})
def scan_web_endpoints(path, content, rel_path):
lower_path = str(path).lower()
if not any(x in lower_path for x in ["/www", "/web", "/htdocs", "/cgi-bin", ".html", ".js", ".php", ".cgi", ".asp"]):
return
endpoints = re.findall(r"[\"'](/[^\"'\s<>]{2,})[\"']", content)
forms = re.findall(r"(?i)<form[^>]+action=[\"']?([^\"'>\s]+)", content)
scripts = re.findall(r"(?i)<script[^>]+src=[\"']?([^\"'>\s]+)", content)
ajax = re.findall(r"(?i)(?:url|href|src)\s*[:=]\s*[\"']([^\"']+)[\"']", content)
cgi = re.findall(r"[\w./-]+\.cgi(?:\?[^\"'\s<>]*)?", content)
admin_hits = []
for route in set(endpoints + forms + scripts + ajax + cgi):
if any(k in route.lower() for k in ["admin", "login", "password", "upgrade", "firmware", "reboot", "debug", "shell", "system", "config"]):
admin_hits.append(route)
if endpoints or forms or scripts or ajax or cgi:
data = {
"file": rel_path,
"endpoints": sorted(set(endpoints))[:150],
"forms": sorted(set(forms))[:80],
"scripts": sorted(set(scripts))[:80],
"ajax_or_refs": sorted(set(ajax))[:120],
"cgi_refs": sorted(set(cgi))[:120],
"admin_like_routes": sorted(set(admin_hits))[:120]
}
add("web_endpoints", data)
for route in sorted(set(endpoints + forms + scripts + ajax + cgi + admin_hits)):
if len(route) < 300:
add("web_routes", {
"route": route,
"file": rel_path,
"admin_like": route in admin_hits
})
def scan_possible_cves(content, rel_path):
for cve in re.findall(r"CVE-\d{4}-\d{4,7}", content, re.I):
add("cve_references", {"file": rel_path, "cve": cve.upper()})
def yara_scan_file(path, rel_path, yara_rule_path):
if not yara_rule_path:
return
try:
result = subprocess.run(
["yara", "-r", str(yara_rule_path), str(path)],
capture_output=True,
text=True,
timeout=25
)
if result.stdout.strip():
add("yara_matches", {
"file": rel_path,
"matches": result.stdout.strip().splitlines()
})
except FileNotFoundError:
add("tool_warnings", {"tool": "yara", "warning": "YARA is not installed or not in PATH"})
except Exception as e:
add("scan_errors", {"file": rel_path, "error": f"YARA scan failed: {e}"})
def scan_file(path, root, yara_rules=None, quick=False):
try:
if not path.is_file():
return
size = path.stat().st_size
rel_path = safe_rel(path, root)
classify_path(path, root, size, rel_path)
if quick and size > 10_000_000:
add("skipped_large_files", {
"file": rel_path,
"size": size,
"reason": "Skipped because --quick mode is enabled"
})
return
ent = entropy_file(path)
if ent is not None and ent >= 7.5 and size > 1024:
add("high_entropy_files", {
"file": rel_path,
"entropy": ent,
"size": size,
"sha256": sha256_file(path),
"note": "May indicate compression, encryption, packed data, or a binary blob"
})
if is_elf(path):
info = elf_info(path)
file_desc = run_file_cmd(path)
add("elf_binaries", {
"file": rel_path,
"sha256": sha256_file(path),
"size": size,
"elf": info,
"file_cmd": file_desc
})
strings_text = "\n".join(run_strings(path))
scan_text_content(path, strings_text, rel_path)
scan_busybox(path, strings_text, rel_path)
scan_possible_cves(strings_text, rel_path)
if is_text_file(path):
content = read_text(path)
scan_text_content(path, content, rel_path)
scan_web_endpoints(path, content, rel_path)
scan_possible_cves(content, rel_path)
scan_busybox(path, content, rel_path)
if yara_rules:
yara_scan_file(path, rel_path, yara_rules)
except Exception as e:
add("scan_errors", {"file": str(path), "error": str(e)})
# =============================================================================
# POST PROCESSING
# =============================================================================
def detect_firmware_identity(root):
identity_files = [
"etc/os-release", "etc/openwrt_release", "etc/openwrt_version",
"etc/version", "etc/banner", "etc/issue", "version", "release"
]
for rel in identity_files:
path = Path(root) / rel
if path.exists() and path.is_file():
text = read_text(path, max_chars=5000)
if text:
add("firmware_identity", {
"file": rel,
"content_preview": text[:1000]
})
# Heuristic identity from config/version strings
vendor_model_keywords = ["model", "vendor", "product", "device", "board", "firmware", "version"]
for item in REPORT.get("version_strings", [])[:200]:
versions = item.get("versions", [])
hits = [v for v in versions if any(k in v.lower() for k in vendor_model_keywords)]
if hits:
add("firmware_identity", {
"file": item.get("file"),
"identity_hints": hits[:20]
})
def summarize_architecture():
machines = Counter()
endians = Counter()
classes = Counter()
for item in REPORT.get("elf_binaries", []):
elf = item.get("elf", {})
if elf.get("machine"):
machines[elf["machine"]] += 1
if elf.get("endian"):
endians[elf["endian"]] += 1
if elf.get("class"):
classes[elf["class"]] += 1
if machines or endians or classes:
add("architecture_summary", {
"machines": dict(machines.most_common()),
"endian": dict(endians.most_common()),
"class": dict(classes.most_common())
})
def summarize_largest_files(root, limit=30):
files = []
for dirpath, _, filenames in os.walk(root):
for filename in filenames:
p = Path(dirpath) / filename
try:
if p.is_file():
files.append((p.stat().st_size, safe_rel(p, root), sha256_file(p)))
except Exception:
pass
for size, rel, digest in sorted(files, reverse=True)[:limit]:
add("largest_files", {"file": rel, "size": size, "sha256": digest})
def run_plugins(plugin_dir, root):
if not plugin_dir:
return
plugin_dir = Path(plugin_dir)
if not plugin_dir.exists():
add("tool_warnings", {"tool": "plugins", "warning": f"Plugin directory not found: {plugin_dir}"})
return
for plugin_file in plugin_dir.glob("*.py"):
try:
spec = importlib.util.spec_from_file_location(plugin_file.stem, plugin_file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, "scan"):
result = module.scan(str(root))
add("plugin_findings", {
"plugin": plugin_file.name,
"result": result
})
else:
add("tool_warnings", {
"tool": "plugins",
"warning": f"{plugin_file.name} has no scan(root) function"
})
except Exception as e:
add("scan_errors", {
"file": str(plugin_file),
"error": f"Plugin failed: {e}"
})
def score_findings():
weights = {
"private_keys": 35,
"credential_findings": 20,
"possible_password": 15,
"wifi_psk": 15,
"api_key": 15,
"jwt": 15,
"users_groups": 12,
"yara_matches": 35,
"malware_iocs": 30,
"interesting_binaries": 8,
"suspicious_keywords": 6,
"web_routes": 5,
"web_endpoints": 5,
"ssh_related": 5,
"startup_scripts": 5,
"cve_references": 5,
"cve_hints": 3,
}
score = 0
reasons = []
for category, weight in weights.items():
count = len(REPORT.get(category, []))
if count:
added = min(count * weight, weight * 6)
score += added
reasons.append(f"{category}: {count} finding(s), +{added}")
critical = any(x.get("severity") == "critical" for x in REPORT.get("credential_findings", []) if isinstance(x, dict))
malware = len(REPORT.get("malware_iocs", [])) > 0
if score >= 180 or critical or malware:
level = "CRITICAL" if critical or malware else "HIGH"
elif score >= 100:
level = "HIGH"
elif score >= 50:
level = "MEDIUM"
elif score > 0:
level = "LOW"
else:
level = "NONE"
REPORT["risk_summary"] = [{
"score": score,
"level": level,
"reasons": reasons,
"notes": [
"Risk score is a triage aid, not proof of compromise.",
"Manually validate credentials, CVEs, and malware indicators."
]
}]
# =============================================================================
# REPORTING
# =============================================================================
def write_list_file(output_dir, filename, title, items):
path = output_dir / filename
with open(path, "w", encoding="utf-8") as f:
f.write(title + "\n")
f.write("=" * 100 + "\n\n")
if not items:
f.write("No findings.\n")
return path
for item in items:
if isinstance(item, dict):
for k, v in item.items():
f.write(f"{k}: {v}\n")
f.write("-" * 100 + "\n")
else:
f.write(str(item) + "\n")
return path
def write_csv(output_dir):
csv_file = output_dir / "findings.csv"
rows = []
for title, key, _filename in IMPORTANT_SECTIONS:
for item in REPORT.get(key, []):
if isinstance(item, dict):
rows.append({
"category": key,
"title": title,
"file": item.get("file", ""),
"value": item.get("value", item.get("route", item.get("component", ""))),
"severity": item.get("severity", ""),
"confidence": item.get("confidence", ""),
"details": json.dumps(item, ensure_ascii=False)
})
else:
rows.append({
"category": key,
"title": title,
"file": "",
"value": str(item),
"severity": "",
"confidence": "",
"details": str(item)
})
with open(csv_file, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["category", "title", "file", "value", "severity", "confidence", "details"])
writer.writeheader()
writer.writerows(rows)
return csv_file
def write_markdown(output_dir):
md_file = output_dir / "firmware_report.md"
risk = REPORT.get("risk_summary", [{}])[0]
totals = REPORT.get("scan_totals", [{}])[0]
meta = REPORT.get("scan_metadata", [{}])[0]
with open(md_file, "w", encoding="utf-8") as f:
f.write(f"# Firmware Hunter Pro v{VERSION} Report\n\n")
f.write("## Scan Info\n\n")
f.write(f"- Firmware root: `{meta.get('root', 'Unknown')}`\n")
f.write(f"- Started: `{meta.get('started', 'Unknown')}`\n")
f.write(f"- Finished: `{meta.get('finished', 'Unknown')}`\n")
f.write(f"- Total files: `{totals.get('total_files', 0)}`\n")
f.write(f"- Total size: `{totals.get('total_size_bytes', 0)}` bytes\n")
f.write(f"- Quick mode: `{meta.get('quick_mode', False)}`\n\n")
f.write("## Risk Summary\n\n")
f.write(f"- Level: **{risk.get('level', 'UNKNOWN')}**\n")
f.write(f"- Score: **{risk.get('score', 0)}**\n\n")
f.write("## Findings Overview\n\n")
f.write("| Finding | Count |\n|---|---:|\n")
for title, key, _ in IMPORTANT_SECTIONS:
count = len(REPORT.get(key, []))
if count:
f.write(f"| {title} | {count} |\n")
f.write("\n## Recommended Next Steps\n\n")
for step in recommended_steps():
f.write(f"- {step}\n")
return md_file
def write_html_report(output_dir):
html_file = output_dir / "firmware_report.html"
risk = REPORT.get("risk_summary", [{}])[0]
totals = REPORT.get("scan_totals", [{}])[0]
meta = REPORT.get("scan_metadata", [{}])[0]
severity_color = {
"CRITICAL": "#ef4444",
"HIGH": "#f97316",
"MEDIUM": "#eab308",
"LOW": "#22c55e",
"NONE": "#94a3b8"
}.get(risk.get("level", "UNKNOWN"), "#94a3b8")
with open(html_file, "w", encoding="utf-8") as f:
f.write(f"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Firmware Hunter Pro Report</title>
<style>
body {{ font-family: Arial, sans-serif; background: #0f172a; color: #e5e7eb; margin: 0; }}
header {{ background: #020617; padding: 24px 32px; border-bottom: 1px solid #334155; }}
main {{ padding: 24px 32px; }}
h1 {{ margin: 0; color: #7dd3fc; }}
h2 {{ color: #bfdbfe; margin-top: 28px; }}
.card {{ background: #111827; border: 1px solid #334155; border-radius: 14px; padding: 18px; margin-bottom: 18px; box-shadow: 0 10px 24px rgba(0,0,0,.25); }}
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(220px, 1fr)); gap: 14px; }}
.metric {{ background: #1e293b; border-radius: 12px; padding: 14px; border: 1px solid #334155; }}
.metric b {{ display: block; color: #93c5fd; margin-bottom: 8px; }}
.badge {{ display: inline-block; padding: 6px 12px; border-radius: 999px; background: {severity_color}; color: #111827; font-weight: bold; }}
pre {{ background: #020617; color: #d1d5db; padding: 14px; border-radius: 10px; overflow-x: auto; }}
details {{ margin: 12px 0; background: #111827; border: 1px solid #334155; border-radius: 10px; padding: 12px; }}
summary {{ cursor: pointer; font-weight: bold; color: #93c5fd; }}
.count {{ color: #facc15; font-weight: bold; }}
input {{ width: 100%; padding: 12px; border-radius: 10px; border: 1px solid #334155; background: #020617; color: #e5e7eb; margin: 12px 0; }}
.small {{ color: #94a3b8; font-size: 13px; }}
a {{ color: #93c5fd; }}
</style>
<script>
function filterFindings() {{
var input = document.getElementById("search").value.toLowerCase();
var blocks = document.getElementsByTagName("details");
for (var i = 0; i < blocks.length; i++) {{
var text = blocks[i].innerText.toLowerCase();
blocks[i].style.display = text.includes(input) ? "" : "none";
}}
}}
</script>
</head>
<body>
<header>
<h1>Firmware Hunter Pro v{VERSION}</h1>
<p class="small">Offline firmware triage report</p>
</header>
<main>
<div class="grid">
<div class="metric"><b>Risk</b><span class="badge">{html.escape(str(risk.get('level', 'UNKNOWN')))}</span></div>
<div class="metric"><b>Risk Score</b>{html.escape(str(risk.get('score', 0)))}</div>
<div class="metric"><b>Total Files</b>{html.escape(str(totals.get('total_files', 0)))}</div>
<div class="metric"><b>Total Size</b>{html.escape(str(totals.get('total_size_bytes', 0)))} bytes</div>
</div>
<div class="card">
<h2>Scan Info</h2>
<p><b>Firmware Root:</b> {html.escape(str(meta.get('root', 'Unknown')))}</p>
<p><b>Started:</b> {html.escape(str(meta.get('started', 'Unknown')))}</p>
<p><b>Finished:</b> {html.escape(str(meta.get('finished', 'Unknown')))}</p>
<p><b>Quick Mode:</b> {html.escape(str(meta.get('quick_mode', False)))}</p>
<p><b>Worker Jobs:</b> {html.escape(str(meta.get('jobs', 'Unknown')))}</p>
</div>
<div class="card">
<h2>Findings Overview</h2>
""")
for title, key, filename in IMPORTANT_SECTIONS:
count = len(REPORT.get(key, []))
if count:
f.write(f"<p><b>{html.escape(title)}</b>: <span class='count'>{count}</span> — {html.escape(filename)}</p>\n")
f.write("""</div>
<div class="card">
<h2>Recommended Next Steps</h2>
<ul>
""")
for step in recommended_steps():
f.write(f"<li>{html.escape(step)}</li>\n")
f.write("""</ul>
</div>
<h2>Detailed Findings</h2>
<input id="search" onkeyup="filterFindings()" placeholder="Search findings...">
""")
for title, key, _filename in IMPORTANT_SECTIONS:
items = REPORT.get(key, [])
if not items:
continue
f.write(f"<details><summary>{html.escape(title)} ({len(items)})</summary>\n")
f.write("<pre>")
f.write(html.escape(json.dumps(items, indent=2, ensure_ascii=False)))
f.write("</pre></details>\n")
f.write("</main></body></html>")
return html_file
def recommended_steps():
return [
"Review credential_findings.txt, possible_passwords.txt, and wifi_psks.txt for hardcoded credentials.",
"Review startup_scripts.txt to understand what starts at boot.",
"Review web_routes.txt and web_endpoints.txt for login panels, CGI handlers, firmware update routes, and debug routes.",
"Review interesting_binaries.txt for telnet, dropbear, busybox, wget, tftp, netcat, and web servers.",
"Review private_keys.txt and certificates.txt for exposed secrets or reused keys.",
"Review components.txt and cve_hints.txt, then verify versions against authoritative vulnerability databases.",
"Review malware_iocs.txt and yara_matches.txt, then manually validate before making conclusions.",
"Use full_report.json or findings.csv for automation, diffing, or importing into other tools."
]
def write_reports(base_output):
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_dir = Path(base_output) / f"scan_{timestamp}"
output_dir.mkdir(parents=True, exist_ok=True)
json_file = output_dir / "full_report.json"
main_report = output_dir / "firmware_report.txt"
summary_file = output_dir / "summary.txt"
normal_report = {k: v for k, v in REPORT.items()}
with open(json_file, "w", encoding="utf-8") as f:
json.dump(normal_report, f, indent=4, ensure_ascii=False)
risk = REPORT.get("risk_summary", [{}])[0]
totals = REPORT.get("scan_totals", [{}])[0]
meta = REPORT.get("scan_metadata", [{}])[0]
for title, key, filename in IMPORTANT_SECTIONS:
items = REPORT.get(key, [])
if items:
write_list_file(output_dir, filename, title, items)
csv_file = write_csv(output_dir)
md_file = write_markdown(output_dir)
html_file = write_html_report(output_dir)
with open(summary_file, "w", encoding="utf-8") as f:
f.write("Firmware Hunter Pro Summary\n")
f.write("=" * 100 + "\n\n")
f.write(f"Scan Time: {timestamp}\n")
f.write(f"Firmware Root: {meta.get('root', 'Unknown')}\n")
f.write(f"Quick Mode: {meta.get('quick_mode', False)}\n")
f.write(f"Jobs: {meta.get('jobs', 'Unknown')}\n")
f.write(f"Total Files: {totals.get('total_files', 0)}\n")
f.write(f"Total Size: {totals.get('total_size_bytes', 0)} bytes\n\n")
f.write("Risk Summary\n")
f.write("-" * 100 + "\n")
f.write(f"Risk Level: {risk.get('level', 'UNKNOWN')}\n")
f.write(f"Risk Score: {risk.get('score', 0)}\n\n")
f.write("Finding Counts\n")
f.write("-" * 100 + "\n")
for title, key, _ in IMPORTANT_SECTIONS:
count = len(REPORT.get(key, []))
if count:
f.write(f"{title:<35} {count}\n")
with open(main_report, "w", encoding="utf-8") as f:
f.write("Firmware Hunter Pro Report\n")
f.write("=" * 100 + "\n")
f.write("Offline firmware triage report\n")
f.write("=" * 100 + "\n\n")
f.write("[SCAN INFO]\n")
f.write(f"Firmware Root : {meta.get('root', 'Unknown')}\n")
f.write(f"Started : {meta.get('started', 'Unknown')}\n")
f.write(f"Finished : {meta.get('finished', 'Unknown')}\n")
f.write(f"Quick Mode : {meta.get('quick_mode', False)}\n")
f.write(f"Worker Jobs : {meta.get('jobs', 'Unknown')}\n")
f.write(f"Total Files : {totals.get('total_files', 0)}\n")
f.write(f"Total Size : {totals.get('total_size_bytes', 0)} bytes\n\n")
f.write("[RISK SUMMARY]\n")
f.write(f"Level : {risk.get('level', 'UNKNOWN')}\n")
f.write(f"Score : {risk.get('score', 0)}\n\n")
if risk.get("reasons"):
f.write("Reasons:\n")
for reason in risk.get("reasons", []):
f.write(f" - {reason}\n")
f.write("\n")
f.write("[FINDINGS OVERVIEW]\n")
for title, key, filename in IMPORTANT_SECTIONS:
count = len(REPORT.get(key, []))
if count:
f.write(f"[+] {title:<35} {count:<5} -> {filename}\n")
else:
f.write(f"[-] {title:<35} 0\n")
f.write("\n[RECOMMENDED NEXT STEPS]\n")
f.write("-" * 100 + "\n")
for idx, step in enumerate(recommended_steps(), 1):
f.write(f"{idx}. {step}\n")
f.write("\n[GENERATED FILES]\n")
for file in sorted(output_dir.iterdir()):
f.write(f"- {file.name}\n")
print()
print("=" * 100)
print(" Firmware Hunter Pro Scan Complete")
print("=" * 100)
print(f" Output Folder : {output_dir}")
print(f" Main Report : {main_report}")
print(f" Summary : {summary_file}")
print(f" HTML Report : {html_file}")
print(f" Markdown : {md_file}")
print(f" CSV Findings : {csv_file}")
print(f" JSON Report : {json_file}")
print("=" * 100)
# =============================================================================
# MAIN SCAN
# =============================================================================
def scan_firmware(root, yara_rules=None, quick=False, jobs=8, plugins=None):
root = Path(root).resolve()
if not root.exists():
raise FileNotFoundError(f"Firmware path does not exist: {root}")
if not root.is_dir():
raise NotADirectoryError(f"Firmware path should be an extracted directory/rootfs: {root}")
files = []
total_size = 0
for dirpath, _, filenames in os.walk(root):
for filename in filenames:
path = Path(dirpath) / filename
if path.is_file():
files.append(path)
try:
total_size += path.stat().st_size
except Exception:
pass
REPORT["scan_metadata"] = [{
"tool": "Firmware Hunter Pro",
"version": VERSION,
"root": str(root),
"started": now_iso(),
"quick_mode": quick,
"jobs": jobs,
"yara_rules": str(yara_rules) if yara_rules else None,
"plugins": str(plugins) if plugins else None
}]
print(f"[+] Files discovered: {len(files)}")
print(f"[+] Total size: {total_size} bytes")
print(f"[+] Worker threads: {jobs}")
completed = 0
start_time = time.time()
with ThreadPoolExecutor(max_workers=max(1, jobs)) as executor:
futures = [executor.submit(scan_file, f, root, yara_rules, quick) for f in files]
for future in as_completed(futures):
completed += 1
try:
future.result()
except Exception as e:
add("scan_errors", {"file": "unknown", "error": str(e)})
if completed % 500 == 0:
elapsed = max(time.time() - start_time, 1)
rate = completed / elapsed
print(f"[+] Progress: {completed}/{len(files)} files scanned ({rate:.1f} files/sec)")
detect_firmware_identity(root)
summarize_architecture()
summarize_largest_files(root)
run_plugins(plugins, root)
score_findings()
REPORT["scan_totals"] = [{
"total_files": len(files),
"total_size_bytes": total_size,
"elapsed_seconds": round(time.time() - start_time, 2)
}]
REPORT["scan_metadata"][0]["finished"] = now_iso()
def main():
parser = argparse.ArgumentParser(
description="Firmware Hunter Pro v4.0 - advanced offline firmware triage scanner"
)
parser.add_argument("target", help="Path to extracted firmware/rootfs directory, or firmware image with --extract")
parser.add_argument("-o", "--output", default="firmware_hunter_output", help="Base output directory")
parser.add_argument("--extract", action="store_true", help="Use binwalk to extract target first, then scan extraction")
parser.add_argument("--yara", help="Optional path to YARA rule file or directory")
parser.add_argument("--quick", action="store_true", help="Quick mode: skip files larger than 10 MB")
parser.add_argument("-j", "--jobs", type=int, default=8, help="Number of worker threads, default: 8")
parser.add_argument("--plugins", help="Optional plugins directory. Each plugin should expose scan(root).")
args = parser.parse_args()
print(f"[+] Firmware Hunter Pro v{VERSION}")
print("[+] Safe mode: this tool does not execute firmware binaries.")
check_dependencies(yara_requested=bool(args.yara), extract_requested=args.extract)
target = Path(args.target).resolve()
if args.extract:
scan_root = extract_with_binwalk(target, args.output)
else:
scan_root = target
print(f"[+] Scanning: {scan_root}")
scan_firmware(
scan_root,
yara_rules=args.yara,
quick=args.quick,
jobs=args.jobs,
plugins=args.plugins
)
write_reports(args.output)
if __name__ == "__main__":
main()