hack-house/cmd_chat/server/views.py
leetcrypt e7bacc93da fix(security): comprehensive security hardening — TLS, HMAC WS auth, rate limiting, IP leak prevention
CRITICAL fixes:
- Auto-generated self-signed TLS certs (HTTPS/WSS by default)
- Removed session_key from /srp/verify response (was sent in plaintext)
- Replaced with HMAC-SHA256 ws_token for WebSocket authentication

HIGH fixes:
- WebSocket auth now validates ws_token via hmac.compare_digest()
- /clear endpoint requires Bearer admin_token (printed at server start)
- Password no longer required as CLI arg — supports env var + getpass prompt
- Removed user_ip from Message model (no longer broadcast to clients)

MEDIUM fixes:
- Rate limiter on /srp/init and /srp/verify (10 req/min/IP)
- MessageStore capped at 1000 messages (prevents RAM DoS)
- access_log disabled (was leaking request metadata)

LOW fixes:
- Username sanitization against rich markup injection
- Dead code removed from helpers.py

All 79 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-25 20:30:40 -07:00

175 lines
5.2 KiB
Python

import hashlib
import hmac
import json
import base64
from dataclasses import asdict
from sanic import Sanic, Request, response, Websocket
from sanic.response import HTTPResponse, json as json_response
from .models import Message, UserSession
from .helpers import get_client_ip, send_state, utcnow
def generate_ws_token(user_id: str, secret: bytes) -> str:
return hmac.new(secret, user_id.encode(), hashlib.sha256).hexdigest()
async def srp_init(request: Request, app: Sanic) -> HTTPResponse:
try:
client_ip = get_client_ip(request)
if not app.ctx.rate_limiter.is_allowed(client_ip):
return response.json({"error": "Rate limited"}, status=429)
data = request.json or {}
username = data.get("username", "unknown")
client_public_b64 = data.get("A")
if not client_public_b64:
return response.json({"error": "Missing A"}, status=400)
client_public = base64.b64decode(client_public_b64)
if app.ctx.session_store.username_exists(username):
return response.json({"error": "Username taken"}, status=409)
user_id, B, salt = app.ctx.srp_manager.init_auth(username, client_public)
return response.json(
{
"user_id": user_id,
"B": base64.b64encode(B).decode(),
"salt": base64.b64encode(salt).decode(),
"room_salt": base64.b64encode(app.ctx.room_salt).decode(),
}
)
except Exception:
return response.json({"error": "SRP init failed"}, status=500)
async def srp_verify(request: Request, app: Sanic) -> HTTPResponse:
try:
client_ip = get_client_ip(request)
if not app.ctx.rate_limiter.is_allowed(client_ip):
return response.json({"error": "Rate limited"}, status=429)
data = request.json or {}
user_id = data.get("user_id")
client_proof_b64 = data.get("M")
username = data.get("username", "unknown")
if not user_id or not client_proof_b64:
return response.json({"error": "Missing user_id or M"}, status=400)
client_proof = base64.b64decode(client_proof_b64)
H_AMK, session_key = app.ctx.srp_manager.verify_auth(user_id, client_proof)
fernet_key = base64.urlsafe_b64encode(session_key[:32])
session = UserSession(
user_id=user_id,
ip=get_client_ip(request),
username=username,
fernet_key=fernet_key,
)
app.ctx.session_store.add(session)
ws_token = generate_ws_token(user_id, app.ctx.ws_secret)
return response.json(
{
"H_AMK": base64.b64encode(H_AMK).decode(),
"ws_token": ws_token,
}
)
except ValueError as e:
return response.json({"error": str(e)}, status=401)
except Exception:
return response.json({"error": "SRP verify failed"}, status=500)
async def chat_ws(request: Request, ws: Websocket, app: Sanic) -> None:
user_id = request.args.get("user_id")
ws_token = request.args.get("ws_token")
if not user_id or not ws_token:
await ws.close(code=4002, reason="user_id and ws_token required")
return
expected_token = generate_ws_token(user_id, app.ctx.ws_secret)
if not hmac.compare_digest(ws_token, expected_token):
await ws.close(code=4003, reason="Invalid token")
return
session = app.ctx.session_store.get(user_id)
if not session:
await ws.close(code=4002, reason="Invalid session")
return
manager = app.ctx.connection_manager
await manager.connect(user_id, ws)
try:
await send_state(ws, app)
async for data in ws:
if data is None:
break
app.ctx.session_store.update_activity(user_id)
message = Message(
text=str(data),
username=session.username,
)
app.ctx.message_store.add(message)
await manager.broadcast(
json.dumps(
{
"type": "message",
"data": asdict(message),
}
)
)
except Exception:
pass
finally:
await manager.disconnect(user_id)
await manager.broadcast(
json.dumps(
{
"type": "user_left",
"user_id": user_id,
}
)
)
async def health(request: Request, app: Sanic) -> HTTPResponse:
return json_response(
{
"status": "ok",
"messages": app.ctx.message_store.count(),
"users": app.ctx.session_store.count(),
"timestamp": utcnow().isoformat(),
}
)
async def clear_messages(request: Request, app: Sanic) -> HTTPResponse:
auth_header = request.headers.get("authorization", "")
if not auth_header.startswith("Bearer "):
return response.json({"error": "Unauthorized"}, status=401)
token = auth_header[7:]
if not hmac.compare_digest(token, app.ctx.admin_token):
return response.json({"error": "Unauthorized"}, status=401)
app.ctx.message_store.clear()
return json_response({"status": "cleared"})