import asyncio import json from typing import Optional import rsa import requests from cryptography.fernet import Fernet import websockets from rich.console import Console from rich.panel import Panel class Client: def __init__( self, server: str, port: int, username: str, password: Optional[str] = None ): self.server = server self.port = port self.username = username self.password = password or "" self.user_id: Optional[str] = None self.public_key: Optional[rsa.PublicKey] = None self.private_key: Optional[rsa.PrivateKey] = None self.fernet: Optional[Fernet] = None self.console = Console() self.messages: list[dict] = [] self.users: list[dict] = [] self.connected = False self.running = False @property def base_url(self) -> str: return f"http://{self.server}:{self.port}" @property def ws_url(self) -> str: return f"ws://{self.server}:{self.port}" def success(self, message: str) -> None: self.console.print(f"[green]✓ {message}[/]") def error(self, message: str) -> None: self.console.print(f"[red]✗ {message}[/]") def info(self, message: str) -> None: self.console.print(f"[cyan]• {message}[/]") def generate_keys(self) -> None: with self.console.status( "[cyan]Generating RSA keys (2048 bit)...[/]", spinner="dots" ): self.public_key, self.private_key = rsa.newkeys(2048) self.success("RSA keys generated") def exchange_keys(self) -> None: with self.console.status( "[cyan]Exchanging keys with server...[/]", spinner="dots" ): pubkey_bytes = self.public_key.save_pkcs1() response = requests.post( f"{self.base_url}/get_key", files={"pubkey": ("key.pem", pubkey_bytes)}, data={"username": self.username, "password": self.password}, timeout=30, ) response.raise_for_status() self.user_id = response.headers.get("X-User-Id") encrypted_key = response.content symmetric_key = rsa.decrypt(encrypted_key, self.private_key) self.fernet = Fernet(symmetric_key) self.success(f"Key exchange complete (session: {self.user_id[:8]}...)") self.public_key = None self.private_key = None def render_messages(self) -> None: self.console.clear() users_online = ", ".join(u.get("username", "?") for u in self.users) or "none" self.console.print(f"[dim]Online: {users_online}[/]") self.console.print("─" * 60) display_messages = ( self.messages[-15:] if len(self.messages) > 15 else self.messages ) for msg in display_messages: username = msg.get("username", "unknown") text = msg.get("text", "") timestamp = str(msg.get("timestamp", ""))[:19].replace("T", " ") style = "green" if username == self.username else "cyan" self.console.print(f"[dim]{timestamp}[/] [{style}]{username}[/]: {text}") if not display_messages: self.console.print("[dim italic]No messages yet...[/]") self.console.print("─" * 60) self.console.print("[dim]Type message and press Enter. 'q' to quit.[/]") async def receive_loop(self, ws) -> None: try: async for raw in ws: if not self.running: break data = json.loads(raw) msg_type = data.get("type", "") if msg_type == "init": self.messages = data.get("messages", []) self.users = data.get("users", []) self.connected = True self.render_messages() elif msg_type == "message": msg_data = data.get("data", {}) self.messages.append(msg_data) self.render_messages() elif msg_type == "user_left": left_id = data.get("user_id") self.users = [u for u in self.users if u.get("user_id") != left_id] self.render_messages() except websockets.ConnectionClosed: self.connected = False async def input_loop(self, ws) -> None: loop = asyncio.get_event_loop() while self.running: try: text = await loop.run_in_executor(None, input) if text.lower() in ("q", "quit", "exit"): self.running = False break if text.strip(): await ws.send(text) except (EOFError, KeyboardInterrupt): self.running = False break async def run_async(self) -> None: self.console.clear() self.console.print(Panel("[bold cyan]CMD Chat Client[/]", expand=False)) self.console.print() try: self.generate_keys() self.exchange_keys() self.info("Connecting to chat...") url = ( f"{self.ws_url}/ws/chat?user_id={self.user_id}&password={self.password}" ) async with websockets.connect(url) as ws: self.success("Connected to chat server") self.running = True receive_task = asyncio.create_task(self.receive_loop(ws)) input_task = asyncio.create_task(self.input_loop(ws)) done, pending = await asyncio.wait( [receive_task, input_task], return_when=asyncio.FIRST_COMPLETED ) for task in pending: task.cancel() self.console.print("\n[yellow]Disconnected[/]") except requests.exceptions.ConnectionError: self.error(f"Cannot connect to {self.base_url}") except requests.exceptions.HTTPError as e: self.error(f"Server error: {e.response.status_code} - {e.response.text}") except Exception as e: import traceback self.error(f"Error: {e}") traceback.print_exc() def run(self) -> None: asyncio.run(self.run_async())