Some checks are pending
CI / rust client (hh) (macos-latest) (push) Waiting to run
CI / rust client (hh) (ubuntu-latest) (push) Waiting to run
CI / rust coverage (push) Waiting to run
CI / python server (3.10) (push) Waiting to run
CI / python server (3.11) (push) Waiting to run
CI / python server (3.12) (push) Waiting to run
CI / headless e2e smoke (push) Waiting to run
CI / dependency audit (push) Waiting to run
CI / secret scanning (push) Waiting to run
M1: enforce the declared transfer size (clamped to MAX_SIZE) on chunk receipt in both the Rust and Python clients — a malicious sender can no longer grow the receive buffer unboundedly. M2: only honor X-Forwarded-For when TRUST_PROXY is set, so a direct client can't spoof a source IP to dodge the per-IP rate limiter. M3: evict unverified SRP sessions after a 60s TTL on each new handshake, preventing half-finished auths from exhausting memory. M4: drop WS frames larger than 256 KB before they hit the store or broadcast, bounding per-message memory and flood blast radius. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
89 lines
2.6 KiB
Python
89 lines
2.6 KiB
Python
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
from uuid import uuid4
|
|
|
|
import srp
|
|
|
|
|
|
srp.rfc5054_enable()
|
|
|
|
# Half-finished handshakes (init without a matching verify) would otherwise pile
|
|
# up forever, letting an attacker exhaust memory. Evict any session that hasn't
|
|
# authenticated within this many seconds whenever a new handshake begins.
|
|
UNVERIFIED_TTL_SECONDS = 60
|
|
|
|
|
|
@dataclass
|
|
class SRPSession:
|
|
user_id: str = field(default_factory=lambda: str(uuid4()))
|
|
username: str = ""
|
|
svr: Optional[srp.Verifier] = None
|
|
session_key: Optional[bytes] = None
|
|
authenticated: bool = False
|
|
created_at: float = field(default_factory=time.monotonic)
|
|
|
|
|
|
class SRPAuthManager:
|
|
def __init__(self, password: str):
|
|
self.password = password.encode()
|
|
self.sessions: dict[str, SRPSession] = {}
|
|
self.salt, self.vkey = srp.create_salted_verification_key(
|
|
b"chat", self.password, hash_alg=srp.SHA256
|
|
)
|
|
|
|
def _evict_stale_unverified(self) -> None:
|
|
now = time.monotonic()
|
|
stale = [
|
|
uid
|
|
for uid, s in self.sessions.items()
|
|
if not s.authenticated and now - s.created_at > UNVERIFIED_TTL_SECONDS
|
|
]
|
|
for uid in stale:
|
|
del self.sessions[uid]
|
|
|
|
def init_auth(
|
|
self, username: str, client_public: bytes
|
|
) -> tuple[str, bytes, bytes]:
|
|
self._evict_stale_unverified()
|
|
session = SRPSession(username=username)
|
|
|
|
svr = srp.Verifier(
|
|
b"chat", self.salt, self.vkey, client_public, hash_alg=srp.SHA256
|
|
)
|
|
|
|
s, B = svr.get_challenge()
|
|
|
|
if B is None:
|
|
raise ValueError("SRP challenge generation failed")
|
|
|
|
session.svr = svr
|
|
self.sessions[session.user_id] = session
|
|
|
|
return session.user_id, B, s
|
|
|
|
def verify_auth(self, user_id: str, client_proof: bytes) -> tuple[bytes, bytes]:
|
|
session = self.sessions.get(user_id)
|
|
if not session or not session.svr:
|
|
raise ValueError("Invalid session")
|
|
|
|
H_AMK = session.svr.verify_session(client_proof)
|
|
|
|
if H_AMK is None:
|
|
del self.sessions[user_id]
|
|
raise ValueError("Authentication failed")
|
|
|
|
session.session_key = session.svr.get_session_key()
|
|
session.authenticated = True
|
|
|
|
return H_AMK, session.session_key
|
|
|
|
def get_session(self, user_id: str) -> Optional[SRPSession]:
|
|
session = self.sessions.get(user_id)
|
|
if session and session.authenticated:
|
|
return session
|
|
return None
|
|
|
|
def remove_session(self, user_id: str) -> None:
|
|
self.sessions.pop(user_id, None)
|