CRITICAL fixes: - Auto-generated self-signed TLS certs (HTTPS/WSS by default) - Removed session_key from /srp/verify response (was sent in plaintext) - Replaced with HMAC-SHA256 ws_token for WebSocket authentication HIGH fixes: - WebSocket auth now validates ws_token via hmac.compare_digest() - /clear endpoint requires Bearer admin_token (printed at server start) - Password no longer required as CLI arg — supports env var + getpass prompt - Removed user_ip from Message model (no longer broadcast to clients) MEDIUM fixes: - Rate limiter on /srp/init and /srp/verify (10 req/min/IP) - MessageStore capped at 1000 messages (prevents RAM DoS) - access_log disabled (was leaking request metadata) LOW fixes: - Username sanitization against rich markup injection - Dead code removed from helpers.py All 79 tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
60 lines
1.7 KiB
Python
60 lines
1.7 KiB
Python
from typing import Optional
|
|
from .models import Message, UserSession
|
|
|
|
|
|
class MessageStore:
|
|
def __init__(self, max_messages: int = 1000):
|
|
self._messages: list[Message] = []
|
|
self._max = max_messages
|
|
|
|
def add(self, message: Message) -> None:
|
|
self._messages.append(message)
|
|
if len(self._messages) > self._max:
|
|
self._messages = self._messages[-self._max:]
|
|
|
|
def get_all(self) -> list[Message]:
|
|
return self._messages.copy()
|
|
|
|
def clear(self) -> None:
|
|
count = len(self._messages)
|
|
self._messages.clear()
|
|
|
|
def count(self) -> int:
|
|
return len(self._messages)
|
|
|
|
|
|
class UserSessionStore:
|
|
def __init__(self):
|
|
self._sessions: dict[str, UserSession] = {}
|
|
|
|
def add(self, session: UserSession) -> None:
|
|
self._sessions[session.user_id] = session
|
|
|
|
def get(self, user_id: str) -> Optional[UserSession]:
|
|
return self._sessions.get(user_id)
|
|
|
|
def update_activity(self, user_id: str) -> None:
|
|
if session := self._sessions.get(user_id):
|
|
session.update_activity()
|
|
|
|
def remove(self, user_id: str) -> None:
|
|
if user_id in self._sessions:
|
|
del self._sessions[user_id]
|
|
|
|
def cleanup_stale(self, timeout_seconds: int = 3600) -> int:
|
|
stale_ids = [
|
|
uid for uid, s in self._sessions.items() if s.is_stale(timeout_seconds)
|
|
]
|
|
for uid in stale_ids:
|
|
del self._sessions[uid]
|
|
return len(stale_ids)
|
|
|
|
def get_all(self) -> list[UserSession]:
|
|
return list(self._sessions.values())
|
|
|
|
def count(self) -> int:
|
|
return len(self._sessions)
|
|
|
|
def username_exists(self, username: str) -> bool:
|
|
return any(s.username == username for s in self._sessions.values())
|