hack-house/cmd_chat/client/core/crypto.py
mirai 64b0967292 Fix renderer typing, preserve message text, and harden crypto key handling
Fix abstract renderer signatures and add small stubs so type checkers can
see expected attributes (e.g. username, _decrypt). This removes several
mypy false-positives that were caused by mixin/ABC mismatches.
Preserve message text containing ':' by using split(':', 1) in both
DefaultClientRenderer and RichClientRenderer.
Normalize renderer APIs: print_chat(...) now takes the response mapping
and returns None (matches runtime behavior).
Make RSA symmetric-key request more robust: read r.content instead of a
fixed-size r.raw.read(999), avoiding truncated key material.
Improve _connect_ws exception handling in client to ensure a valid
Exception is re-raised if connection attempts fail.
Correct server/service typing: memory_msgs is now typed as
list[Message] and we null-check incoming payload text before creating a
new Message.
Replace manual package list in setup.py with setuptools.find_packages()
so packaging uses valid Python package names.
Installed types-requests in the project venv so mypy no longer flags the
requests import.
Verification: ran python -m compileall and mypy cmd_chat — no issues
remain.
Notes:

Wire format still uses Python literal evaluation in some places (existing
behavior); switching to JSON for client/server payloads is recommended as a
follow-up for robustness and security.
2025-11-05 19:29:24 +05:30

44 lines
1.4 KiB
Python

import rsa
import requests
from cryptography.fernet import Fernet
from cmd_chat.client.core.abs.abs_crypto import CryptoService
class RSAService(CryptoService):
def __init__(self):
self.public_key = None
self.private_key = None
self.symmetric_key = None
self.fernet = None
self._generate_keys()
def _encrypt(self, message: str) -> str:
return self.fernet.encrypt(message.encode()).decode("utf-8")
def _decrypt(self, message: str) -> str:
return self.fernet.decrypt(message.encode()).decode("utf-8")
def _request_key(self, url: str, username: str, password: str | None = None):
pubkey_bytes = self.public_key.save_pkcs1()
r = requests.post(
url,
files={"pubkey": ("public.pem", pubkey_bytes)},
data={"username": username, "password": password or ""},
stream=True,
)
r.raise_for_status()
# read the full response content (server returns encrypted symmetric key)
message = r.content
self.symmetric_key = rsa.decrypt(message, self.private_key)
self.fernet = Fernet(self.symmetric_key)
def _generate_keys(self):
self.public_key, self.private_key = rsa.newkeys(512)
def _get_generated_keys(self):
return self.private_key, self.public_key
def _remove_keys(self):
self.public_key = None
self.private_key = None