349 lines
12 KiB
Python
349 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Container Escape Attempts
|
|
Various techniques to escape container confinement
|
|
"""
|
|
|
|
import os, sys, subprocess, json, re, shutil, stat
|
|
from pathlib import Path
|
|
|
|
def check_privileges():
|
|
"""Check if container is privileged or has capabilities"""
|
|
results = {}
|
|
|
|
# Check if privileged
|
|
if os.path.exists('/proc/self/status'):
|
|
with open('/proc/self/status', 'r') as f:
|
|
content = f.read()
|
|
if 'CapEff:\t0000003fffffffff' in content:
|
|
results['privileged'] = True
|
|
else:
|
|
results['privileged'] = False
|
|
|
|
# Parse capabilities
|
|
caps_match = re.search(r'CapEff:\s*(.+)', content)
|
|
if caps_match:
|
|
results['capabilities'] = caps_match.group(1).strip()
|
|
|
|
# Check for root user
|
|
results['is_root'] = os.geteuid() == 0
|
|
|
|
# Check mounted filesystems
|
|
mount_info = subprocess.getoutput('mount')
|
|
results['mounts'] = mount_info.split('\n')[:10] # First 10 mounts
|
|
|
|
# Check for sensitive mounts
|
|
sensitive_mounts = ['/proc', '/sys', '/dev', '/var/run/docker.sock']
|
|
results['sensitive_mounts'] = []
|
|
for mount in sensitive_mounts:
|
|
if any(mount in line for line in results['mounts']):
|
|
results['sensitive_mounts'].append(mount)
|
|
|
|
return results
|
|
|
|
def attempt_docker_socket_escape():
|
|
"""Attempt escape via Docker socket"""
|
|
results = {'attempted': False, 'success': False, 'details': ''}
|
|
|
|
docker_socket = '/var/run/docker.sock'
|
|
if os.path.exists(docker_socket):
|
|
results['attempted'] = True
|
|
results['socket_exists'] = True
|
|
|
|
# Check if we can access it
|
|
if os.access(docker_socket, os.R_OK):
|
|
results['socket_accessible'] = True
|
|
|
|
# Try to communicate with Docker
|
|
try:
|
|
# Use curl to talk to Docker API
|
|
cmd = ['curl', '-s', '--unix-socket', docker_socket, 'http://localhost/version']
|
|
output = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
|
|
|
|
if output.returncode == 0:
|
|
results['success'] = True
|
|
docker_info = json.loads(output.stdout)
|
|
results['details'] = f"Docker API accessible: {docker_info.get('Version')}"
|
|
|
|
# Try to list containers
|
|
cmd = ['curl', '-s', '--unix-socket', docker_socket, 'http://localhost/containers/json']
|
|
containers = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
|
|
if containers.returncode == 0:
|
|
container_list = json.loads(containers.stdout)
|
|
results['containers'] = len(container_list)
|
|
else:
|
|
results['details'] = f"Docker API error: {output.stderr}"
|
|
except Exception as e:
|
|
results['details'] = f"Exception: {e}"
|
|
else:
|
|
results['socket_accessible'] = False
|
|
results['details'] = "Docker socket exists but not accessible"
|
|
else:
|
|
results['socket_exists'] = False
|
|
|
|
return results
|
|
|
|
def attempt_cgroup_escape():
|
|
"""Attempt escape via cgroup release_agent"""
|
|
results = {'attempted': False, 'success': False, 'details': ''}
|
|
|
|
# Check if we can write to cgroup release_agent
|
|
cgroup_paths = [
|
|
'/sys/fs/cgroup/*/release_agent',
|
|
'/sys/fs/cgroup/*/*/release_agent'
|
|
]
|
|
|
|
import glob
|
|
for pattern in cgroup_paths:
|
|
for release_agent in glob.glob(pattern):
|
|
try:
|
|
# Test write
|
|
with open(release_agent, 'w') as f:
|
|
f.write('test')
|
|
|
|
# If we get here, we can write
|
|
results['attempted'] = True
|
|
results['writable_release_agent'] = release_agent
|
|
results['details'] = f"Writable release_agent: {release_agent}"
|
|
|
|
# Note: Actual escape would require more steps
|
|
# This just checks for vulnerability
|
|
results['success'] = True
|
|
break
|
|
except:
|
|
continue
|
|
|
|
if not results['attempted']:
|
|
results['details'] = "No writable release_agent found"
|
|
|
|
return results
|
|
|
|
def attempt_device_escape():
|
|
"""Attempt escape via device access"""
|
|
results = {'attempted': False, 'success': False, 'details': ''}
|
|
|
|
# Check for accessible devices
|
|
dev_path = '/dev'
|
|
dangerous_devices = ['sda', 'nvme0n1', 'dm-0', 'loop0']
|
|
|
|
accessible_devices = []
|
|
for device in dangerous_devices:
|
|
device_path = os.path.join(dev_path, device)
|
|
if os.path.exists(device_path) and os.access(device_path, os.R_OK):
|
|
accessible_devices.append(device)
|
|
|
|
if accessible_devices:
|
|
results['attempted'] = True
|
|
results['accessible_devices'] = accessible_devices
|
|
results['details'] = f"Accessible devices: {accessible_devices}"
|
|
|
|
# Try to read disk
|
|
for device in accessible_devices[:1]: # Try first device
|
|
try:
|
|
# Just read first sector to test
|
|
cmd = ['dd', f'if=/dev/{device}', 'bs=512', 'count=1', 'status=none']
|
|
output = subprocess.run(cmd, capture_output=True, timeout=5)
|
|
if output.returncode == 0:
|
|
results['success'] = True
|
|
results['details'] += f" - Can read from /dev/{device}"
|
|
break
|
|
except:
|
|
pass
|
|
|
|
return results
|
|
|
|
def attempt_kernel_module_load():
|
|
"""Attempt to load kernel module"""
|
|
results = {'attempted': False, 'success': False, 'details': ''}
|
|
|
|
# Check if we can load modules
|
|
modules_path = '/lib/modules'
|
|
if os.path.exists(modules_path) and os.listdir(modules_path):
|
|
kernel_version = os.listdir(modules_path)[0]
|
|
results['kernel_version'] = kernel_version
|
|
|
|
# Check capabilities
|
|
try:
|
|
# Try to use insmod (would require CAP_SYS_MODULE)
|
|
test_module = '/tmp/test.ko'
|
|
|
|
# Create a simple dummy module (just a text file for testing)
|
|
with open(test_module, 'w') as f:
|
|
f.write('dummy module')
|
|
|
|
cmd = ['insmod', test_module]
|
|
output = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
|
|
|
|
if output.returncode == 0 or 'Operation not permitted' not in output.stderr:
|
|
results['attempted'] = True
|
|
results['details'] = f"Module load attempted: {output.stderr}"
|
|
|
|
# Check actual error
|
|
if 'Operation not permitted' in output.stderr:
|
|
results['success'] = False
|
|
else:
|
|
results['success'] = True
|
|
else:
|
|
results['details'] = "Cannot load modules (no CAP_SYS_MODULE)"
|
|
|
|
# Cleanup
|
|
if os.path.exists(test_module):
|
|
os.remove(test_module)
|
|
|
|
except Exception as e:
|
|
results['details'] = f"Exception: {e}"
|
|
|
|
return results
|
|
|
|
def attempt_mount_escape():
|
|
"""Attempt escape via mount operations"""
|
|
results = {'attempted': False, 'success': False, 'details': ''}
|
|
|
|
# Check if we have mount capabilities
|
|
try:
|
|
# Try to create a bind mount
|
|
test_dir = '/tmp/test_mount'
|
|
os.makedirs(test_dir, exist_ok=True)
|
|
|
|
cmd = ['mount', '--bind', '/tmp', test_dir]
|
|
output = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
|
|
|
|
if output.returncode == 0:
|
|
results['attempted'] = True
|
|
results['success'] = True
|
|
results['details'] = "Can create bind mounts"
|
|
|
|
# Cleanup
|
|
subprocess.run(['umount', test_dir], capture_output=True)
|
|
else:
|
|
if 'Operation not permitted' in output.stderr:
|
|
results['details'] = "Cannot mount (no CAP_SYS_ADMIN)"
|
|
else:
|
|
results['attempted'] = True
|
|
results['details'] = f"Mount test: {output.stderr}"
|
|
|
|
# Cleanup
|
|
if os.path.exists(test_dir):
|
|
shutil.rmtree(test_dir)
|
|
|
|
except Exception as e:
|
|
results['details'] = f"Exception: {e}"
|
|
|
|
return results
|
|
|
|
def check_for_breakout_techniques():
|
|
"""Check for known container breakout techniques"""
|
|
techniques = {
|
|
'dirtycow': False,
|
|
'shocker': False,
|
|
'dirtypipe': False,
|
|
'runc_escape': False,
|
|
}
|
|
|
|
# Check kernel version for vulnerabilities
|
|
try:
|
|
kernel_version = subprocess.getoutput('uname -r')
|
|
|
|
# Dirty COW (CVE-2016-5195)
|
|
if any(vuln in kernel_version for vuln in ['3.13', '3.16', '3.19', '4.4', '4.8']):
|
|
techniques['dirtycow'] = True
|
|
|
|
# Dirty Pipe (CVE-2022-0847)
|
|
if '5.8' <= kernel_version <= '5.16.11':
|
|
techniques['dirtypipe'] = True
|
|
|
|
# Check for runc vulnerability (CVE-2019-5736)
|
|
# This would require checking runc version
|
|
docker_version = subprocess.getoutput('docker version 2>/dev/null | grep Version | head -1')
|
|
if '18.09' in docker_version:
|
|
techniques['runc_escape'] = True
|
|
|
|
except:
|
|
pass
|
|
|
|
return techniques
|
|
|
|
def main():
|
|
"""Main execution"""
|
|
print("[Container Escape Assessment]")
|
|
print("=" * 60)
|
|
|
|
# Check privileges
|
|
print("\n[1] Checking container privileges...")
|
|
privileges = check_privileges()
|
|
|
|
print(f" Privileged: {privileges.get('privileged', 'Unknown')}")
|
|
print(f" Running as root: {privileges.get('is_root', False)}")
|
|
print(f" Capabilities: {privileges.get('capabilities', 'Unknown')}")
|
|
|
|
if privileges.get('sensitive_mounts'):
|
|
print(f" Sensitive mounts: {privileges['sensitive_mounts']}")
|
|
|
|
# Attempt escapes
|
|
print("\n[2] Attempting escape techniques...")
|
|
|
|
escapes = {
|
|
'Docker Socket': attempt_docker_socket_escape(),
|
|
'Cgroup release_agent': attempt_cgroup_escape(),
|
|
'Device Access': attempt_device_escape(),
|
|
'Kernel Module': attempt_kernel_module_load(),
|
|
'Mount Escape': attempt_mount_escape(),
|
|
}
|
|
|
|
for name, result in escapes.items():
|
|
print(f" {name}: {'SUCCESS' if result.get('success') else 'FAILED'}")
|
|
if result.get('details'):
|
|
print(f" Details: {result['details'][:80]}...")
|
|
|
|
# Check for known vulnerabilities
|
|
print("\n[3] Checking for known vulnerabilities...")
|
|
vulnerabilities = check_for_breakout_techniques()
|
|
|
|
for vuln, present in vulnerabilities.items():
|
|
print(f" {vuln}: {'POSSIBLE' if present else 'Not detected'}")
|
|
|
|
# Recommendations
|
|
print("\n[4] Recommendations:")
|
|
|
|
if privileges.get('privileged'):
|
|
print(" ⚠️ Container is PRIVILEGED - Easy escape possible")
|
|
print(" → Use docker.sock to create new privileged container")
|
|
|
|
if escapes['Docker Socket'].get('success'):
|
|
print(" ⚠️ Docker socket accessible - Full host control")
|
|
print(" → Use Docker API to create privileged containers")
|
|
|
|
if escapes['Cgroup release_agent'].get('success'):
|
|
print(" ⚠️ Cgroup release_agent writable - Kernel escape possible")
|
|
print(" → Use release_agent to execute commands on host")
|
|
|
|
if privileges.get('is_root'):
|
|
print(" ⚠️ Running as root - More escape options available")
|
|
print(" → Try all root-based escape techniques")
|
|
|
|
# Output for exfiltration
|
|
result = {
|
|
'privileges': privileges,
|
|
'escape_attempts': escapes,
|
|
'vulnerabilities': vulnerabilities,
|
|
'timestamp': __import__('time').time(),
|
|
'recommendations': []
|
|
}
|
|
|
|
# Add recommendations
|
|
if privileges.get('privileged'):
|
|
result['recommendations'].append('privileged_container_escape')
|
|
if escapes['Docker Socket'].get('success'):
|
|
result['recommendations'].append('docker_socket_escape')
|
|
if escapes['Cgroup release_agent'].get('success'):
|
|
result['recommendations'].append('cgroup_escape')
|
|
if privileges.get('is_root'):
|
|
result['recommendations'].append('root_escape_techniques')
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
if __name__ == "__main__":
|
|
output = main()
|
|
print("\n" + "=" * 60)
|
|
print("[*] Assessment complete - Output ready for exfiltration")
|