hack-house/cmd_chat/server/server.py
leetcrypt e7bacc93da fix(security): comprehensive security hardening — TLS, HMAC WS auth, rate limiting, IP leak prevention
CRITICAL fixes:
- Auto-generated self-signed TLS certs (HTTPS/WSS by default)
- Removed session_key from /srp/verify response (was sent in plaintext)
- Replaced with HMAC-SHA256 ws_token for WebSocket authentication

HIGH fixes:
- WebSocket auth now validates ws_token via hmac.compare_digest()
- /clear endpoint requires Bearer admin_token (printed at server start)
- Password no longer required as CLI arg — supports env var + getpass prompt
- Removed user_ip from Message model (no longer broadcast to clients)

MEDIUM fixes:
- Rate limiter on /srp/init and /srp/verify (10 req/min/IP)
- MessageStore capped at 1000 messages (prevents RAM DoS)
- access_log disabled (was leaking request metadata)

LOW fixes:
- Username sanitization against rich markup injection
- Dead code removed from helpers.py

All 79 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-25 20:30:40 -07:00

98 lines
2.9 KiB
Python

import ipaddress
import ssl
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from .factory import create_app
DEFAULT_CERT_DIR = Path.home() / ".cmd-chat" / "certs"
def ensure_tls_certs(cert_dir: Path = DEFAULT_CERT_DIR) -> tuple[Path, Path]:
cert_dir.mkdir(parents=True, exist_ok=True)
cert_path = cert_dir / "server.pem"
key_path = cert_dir / "server-key.pem"
if cert_path.exists() and key_path.exists():
return cert_path, key_path
key = ec.generate_private_key(ec.SECP256R1())
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, "cmd-chat"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "cmd-chat-self-signed"),
])
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(timezone.utc))
.not_valid_after(datetime.now(timezone.utc) + timedelta(days=365))
.add_extension(
x509.SubjectAlternativeName([
x509.DNSName("localhost"),
x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")),
]),
critical=False,
)
.sign(key, hashes.SHA256())
)
key_path.write_bytes(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)
cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM))
return cert_path, key_path
def run_server(
host: str = "0.0.0.0",
port: int = 8000,
password: Optional[str] = None,
cert_path: Optional[str] = None,
key_path: Optional[str] = None,
no_tls: bool = False,
) -> None:
app = create_app(password=password or "")
ssl_ctx = None
if not no_tls:
if cert_path and key_path:
c, k = Path(cert_path), Path(key_path)
else:
c, k = ensure_tls_certs()
print(f"[TLS] Auto-generated self-signed cert: {c}")
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_ctx.load_cert_chain(str(c), str(k))
protocol = "https"
else:
print("[WARNING] TLS disabled — all traffic is unencrypted!")
protocol = "http"
print(f"[ADMIN] Clear token: {app.ctx.admin_token}")
print(f"[SERVER] Listening on {protocol}://{host}:{port}")
app.run(
host=host,
port=port,
single_process=True,
debug=False,
access_log=False,
ssl=ssl_ctx,
)