hack-house/cmd_chat/server/srp_auth.py
leetcrypt 2c4a4f9a22
Some checks are pending
CI / rust client (hh) (macos-latest) (push) Waiting to run
CI / rust client (hh) (ubuntu-latest) (push) Waiting to run
CI / rust coverage (push) Waiting to run
CI / python server (3.10) (push) Waiting to run
CI / python server (3.11) (push) Waiting to run
CI / python server (3.12) (push) Waiting to run
CI / headless e2e smoke (push) Waiting to run
CI / dependency audit (push) Waiting to run
CI / secret scanning (push) Waiting to run
harden(ft,auth,net): cap transfers/frames, evict stale SRP, distrust XFF
M1: enforce the declared transfer size (clamped to MAX_SIZE) on chunk
receipt in both the Rust and Python clients — a malicious sender can no
longer grow the receive buffer unboundedly.
M2: only honor X-Forwarded-For when TRUST_PROXY is set, so a direct
client can't spoof a source IP to dodge the per-IP rate limiter.
M3: evict unverified SRP sessions after a 60s TTL on each new handshake,
preventing half-finished auths from exhausting memory.
M4: drop WS frames larger than 256 KB before they hit the store or
broadcast, bounding per-message memory and flood blast radius.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-05 06:59:16 -07:00

89 lines
2.6 KiB
Python

import time
from dataclasses import dataclass, field
from typing import Optional
from uuid import uuid4
import srp
srp.rfc5054_enable()
# Half-finished handshakes (init without a matching verify) would otherwise pile
# up forever, letting an attacker exhaust memory. Evict any session that hasn't
# authenticated within this many seconds whenever a new handshake begins.
UNVERIFIED_TTL_SECONDS = 60
@dataclass
class SRPSession:
user_id: str = field(default_factory=lambda: str(uuid4()))
username: str = ""
svr: Optional[srp.Verifier] = None
session_key: Optional[bytes] = None
authenticated: bool = False
created_at: float = field(default_factory=time.monotonic)
class SRPAuthManager:
def __init__(self, password: str):
self.password = password.encode()
self.sessions: dict[str, SRPSession] = {}
self.salt, self.vkey = srp.create_salted_verification_key(
b"chat", self.password, hash_alg=srp.SHA256
)
def _evict_stale_unverified(self) -> None:
now = time.monotonic()
stale = [
uid
for uid, s in self.sessions.items()
if not s.authenticated and now - s.created_at > UNVERIFIED_TTL_SECONDS
]
for uid in stale:
del self.sessions[uid]
def init_auth(
self, username: str, client_public: bytes
) -> tuple[str, bytes, bytes]:
self._evict_stale_unverified()
session = SRPSession(username=username)
svr = srp.Verifier(
b"chat", self.salt, self.vkey, client_public, hash_alg=srp.SHA256
)
s, B = svr.get_challenge()
if B is None:
raise ValueError("SRP challenge generation failed")
session.svr = svr
self.sessions[session.user_id] = session
return session.user_id, B, s
def verify_auth(self, user_id: str, client_proof: bytes) -> tuple[bytes, bytes]:
session = self.sessions.get(user_id)
if not session or not session.svr:
raise ValueError("Invalid session")
H_AMK = session.svr.verify_session(client_proof)
if H_AMK is None:
del self.sessions[user_id]
raise ValueError("Authentication failed")
session.session_key = session.svr.get_session_key()
session.authenticated = True
return H_AMK, session.session_key
def get_session(self, user_id: str) -> Optional[SRPSession]:
session = self.sessions.get(user_id)
if session and session.authenticated:
return session
return None
def remove_session(self, user_id: str) -> None:
self.sessions.pop(user_id, None)