Fix abstract renderer signatures and add small stubs so type checkers can
see expected attributes (e.g. username, _decrypt). This removes several
mypy false-positives that were caused by mixin/ABC mismatches.
Preserve message text containing ':' by using split(':', 1) in both
DefaultClientRenderer and RichClientRenderer.
Normalize renderer APIs: print_chat(...) now takes the response mapping
and returns None (matches runtime behavior).
Make RSA symmetric-key request more robust: read r.content instead of a
fixed-size r.raw.read(999), avoiding truncated key material.
Improve _connect_ws exception handling in client to ensure a valid
Exception is re-raised if connection attempts fail.
Correct server/service typing: memory_msgs is now typed as
list[Message] and we null-check incoming payload text before creating a
new Message.
Replace manual package list in setup.py with setuptools.find_packages()
so packaging uses valid Python package names.
Installed types-requests in the project venv so mypy no longer flags the
requests import.
Verification: ran python -m compileall and mypy cmd_chat — no issues
remain.
Notes:
Wire format still uses Python literal evaluation in some places (existing
behavior); switching to JSON for client/server payloads is recommended as a
follow-up for robustness and security.
44 lines
1.4 KiB
Python
44 lines
1.4 KiB
Python
import rsa
|
|
import requests
|
|
from cryptography.fernet import Fernet
|
|
|
|
from cmd_chat.client.core.abs.abs_crypto import CryptoService
|
|
|
|
|
|
class RSAService(CryptoService):
|
|
def __init__(self):
|
|
self.public_key = None
|
|
self.private_key = None
|
|
self.symmetric_key = None
|
|
self.fernet = None
|
|
self._generate_keys()
|
|
|
|
def _encrypt(self, message: str) -> str:
|
|
return self.fernet.encrypt(message.encode()).decode("utf-8")
|
|
|
|
def _decrypt(self, message: str) -> str:
|
|
return self.fernet.decrypt(message.encode()).decode("utf-8")
|
|
|
|
def _request_key(self, url: str, username: str, password: str | None = None):
|
|
pubkey_bytes = self.public_key.save_pkcs1()
|
|
r = requests.post(
|
|
url,
|
|
files={"pubkey": ("public.pem", pubkey_bytes)},
|
|
data={"username": username, "password": password or ""},
|
|
stream=True,
|
|
)
|
|
r.raise_for_status()
|
|
# read the full response content (server returns encrypted symmetric key)
|
|
message = r.content
|
|
self.symmetric_key = rsa.decrypt(message, self.private_key)
|
|
self.fernet = Fernet(self.symmetric_key)
|
|
|
|
def _generate_keys(self):
|
|
self.public_key, self.private_key = rsa.newkeys(512)
|
|
|
|
def _get_generated_keys(self):
|
|
return self.private_key, self.public_key
|
|
|
|
def _remove_keys(self):
|
|
self.public_key = None
|
|
self.private_key = None |