hack-house/.history/cmd_chat/server/server_20251105191907.py
mirai 64b0967292 Fix renderer typing, preserve message text, and harden crypto key handling
Fix abstract renderer signatures and add small stubs so type checkers can
see expected attributes (e.g. username, _decrypt). This removes several
mypy false-positives that were caused by mixin/ABC mismatches.
Preserve message text containing ':' by using split(':', 1) in both
DefaultClientRenderer and RichClientRenderer.
Normalize renderer APIs: print_chat(...) now takes the response mapping
and returns None (matches runtime behavior).
Make RSA symmetric-key request more robust: read r.content instead of a
fixed-size r.raw.read(999), avoiding truncated key material.
Improve _connect_ws exception handling in client to ensure a valid
Exception is re-raised if connection attempts fail.
Correct server/service typing: memory_msgs is now typed as
list[Message] and we null-check incoming payload text before creating a
new Message.
Replace manual package list in setup.py with setuptools.find_packages()
so packaging uses valid Python package names.
Installed types-requests in the project venv so mypy no longer flags the
requests import.
Verification: ran python -m compileall and mypy cmd_chat — no issues
remain.
Notes:

Wire format still uses Python literal evaluation in some places (existing
behavior); switching to JSON for client/server payloads is recommended as a
follow-up for robustness and security.
2025-11-05 19:29:24 +05:30

114 lines
3.9 KiB
Python

import asyncio
import rsa
from cryptography.fernet import Fernet
from functools import partial
from sanic.worker.loader import AppLoader
from sanic.response import HTTPResponse
from sanic import Sanic, Request, response, Websocket
from cmd_chat.server.models import Message
from cmd_chat.server.services import (
_get_bytes_and_serialize,
_check_ws_for_close_status,
_generate_new_message,
_generate_update_payload
)
app = Sanic("app")
app.config.OAS = False
MESSAGES_MEMORY_DB: list[Message] = []
USERS: dict[str, str] = {}
PUBLIC_KEY = Fernet.generate_key()
def _check_password(request: Request, expected: str | None) -> bool:
if not expected:
return True
q = request.args.get("password")
f = request.form.get("password") if hasattr(request, "form") else None
return (q or f) == expected
def _get_str_arg(request: Request, name: str) -> str | None:
return request.form.get(name) or request.args.get(name)
def attach_endpoints(app: Sanic):
@app.websocket("/talk")
async def talk_ws_view(request: Request, ws: Websocket) -> HTTPResponse:
if not _check_password(request, app.ctx.ADMIN_PASSWORD):
await ws.close(code=4001, reason="unauthorized")
return
while True:
serialized_message: dict = await _get_bytes_and_serialize(ws)
await _check_ws_for_close_status(serialized_message, ws)
text = serialized_message.get("text")
if text is None:
continue
new_message = await _generate_new_message(text)
MESSAGES_MEMORY_DB.append(new_message)
await ws.send(str({"status": "ok"}))
await asyncio.sleep(0.2)
@app.websocket("/update")
async def update_ws_view(request: Request, ws: Websocket) -> HTTPResponse:
if not _check_password(request, app.ctx.ADMIN_PASSWORD):
await ws.close(code=4001, reason="unauthorized")
return
while True:
payload = await _generate_update_payload(MESSAGES_MEMORY_DB, USERS)
await ws.send(payload.encode())
await asyncio.sleep(0.2)
@app.route('/get_key', methods=['GET', 'POST'])
async def get_key_view(request: Request) -> HTTPResponse:
if not _check_password(request, app.ctx.ADMIN_PASSWORD):
return response.text("unauthorized", status=401)
pubkey_bytes: bytes | None = None
if "pubkey" in request.files and request.files.get("pubkey"):
f = request.files.get("pubkey")
if isinstance(f, list):
f = f[0]
pubkey_bytes = f.body
if pubkey_bytes is None:
raw = request.form.get("pubkey")
if raw:
pubkey_bytes = raw if isinstance(raw, bytes) else str(raw).encode()
if pubkey_bytes is None:
raw = request.args.get("pubkey")
if raw:
pubkey_bytes = raw.encode()
if not pubkey_bytes:
return response.text("bad request: pubkey is required", status=400)
try:
public_key = rsa.PublicKey.load_pkcs1(pubkey_bytes)
except Exception as e:
return response.text(f"bad pubkey: {e}", status=400)
encrypted_data = rsa.encrypt(PUBLIC_KEY, public_key)
username = _get_str_arg(request, "username") or "unknown"
user_key = f"{request.ip}, {username}"
if user_key not in USERS:
USERS[user_key] = PUBLIC_KEY
return response.raw(encrypted_data)
def create_app(app_name: str, admin_password: str | None) -> Sanic:
app = Sanic(app_name)
app.ctx.ADMIN_PASSWORD = admin_password
attach_endpoints(app)
return app
def run_server(host: str, port: int, dev: bool = False, admin_password: str | None = None) -> None:
loader = AppLoader(factory=partial(create_app, "CMD_SERVER", admin_password))
app = loader.load()
app.prepare(host=host, port=port, dev=dev)
Sanic.serve(primary=app, app_loader=loader)