import asyncio import json import base64 from typing import Optional import srp import requests from cryptography.fernet import Fernet import websockets from rich.console import Console from rich.panel import Panel srp.rfc5054_enable() class Client: def __init__( self, server: str, port: int, username: str, password: Optional[str] = None ): self.server = server self.port = port self.username = username self.password = (password or "").encode() self.user_id: Optional[str] = None self.fernet: Optional[Fernet] = None self.console = Console() self.messages: list[dict] = [] self.users: list[dict] = [] self.connected = False self.running = False @property def base_url(self) -> str: return f"http://{self.server}:{self.port}" @property def ws_url(self) -> str: return f"ws://{self.server}:{self.port}" def success(self, message: str) -> None: self.console.print(f"[green]✓ {message}[/]") def error(self, message: str) -> None: self.console.print(f"[red]✗ {message}[/]") def info(self, message: str) -> None: self.console.print(f"[cyan]• {message}[/]") def srp_authenticate(self) -> None: with self.console.status("[cyan]Starting SRP handshake...[/]", spinner="dots"): usr = srp.User(b"chat", self.password, hash_alg=srp.SHA256) _, A = usr.start_authentication() resp = requests.post( f"{self.base_url}/srp/init", json={ "username": self.username, "A": base64.b64encode(A).decode(), }, timeout=30, ) resp.raise_for_status() init_data = resp.json() self.user_id = init_data["user_id"] B = base64.b64decode(init_data["B"]) salt = base64.b64decode(init_data["salt"]) M = usr.process_challenge(salt, B) if M is None: raise ValueError("SRP challenge processing failed") resp = requests.post( f"{self.base_url}/srp/verify", json={ "user_id": self.user_id, "username": self.username, "M": base64.b64encode(M).decode(), }, timeout=30, ) resp.raise_for_status() verify_data = resp.json() H_AMK = base64.b64decode(verify_data["H_AMK"]) usr.verify_session(H_AMK) if not usr.authenticated(): raise ValueError("Server authentication failed") session_key = base64.b64decode(verify_data["session_key"]) self.fernet = Fernet(session_key) self.success(f"SRP authenticated (session: {self.user_id[:8]}...)") def render_messages(self) -> None: self.console.clear() users_online = ", ".join(u.get("username", "?") for u in self.users) or "none" self.console.print(f"[dim]Online: {users_online}[/]") self.console.print("─" * 60) display_messages = ( self.messages[-15:] if len(self.messages) > 15 else self.messages ) for msg in display_messages: username = msg.get("username", "unknown") text = msg.get("text", "") timestamp = str(msg.get("timestamp", ""))[:19].replace("T", " ") style = "green" if username == self.username else "cyan" self.console.print(f"[dim]{timestamp}[/] [{style}]{username}[/]: {text}") if not display_messages: self.console.print("[dim italic]No messages yet...[/]") self.console.print("─" * 60) self.console.print("[dim]Type message and press Enter. 'q' to quit.[/]") async def receive_loop(self, ws) -> None: try: async for raw in ws: if not self.running: break data = json.loads(raw) msg_type = data.get("type", "") if msg_type == "init": self.messages = data.get("messages", []) self.users = data.get("users", []) self.connected = True self.render_messages() elif msg_type == "message": msg_data = data.get("data", {}) self.messages.append(msg_data) self.render_messages() elif msg_type == "user_left": left_id = data.get("user_id") self.users = [u for u in self.users if u.get("user_id") != left_id] self.render_messages() except websockets.ConnectionClosed: self.connected = False async def input_loop(self, ws) -> None: loop = asyncio.get_event_loop() while self.running: try: text = await loop.run_in_executor(None, input) if text.lower() in ("q", "quit", "exit"): self.running = False break if text.strip(): await ws.send(text) except (EOFError, KeyboardInterrupt): self.running = False break async def run_async(self) -> None: self.console.clear() self.console.print(Panel("[bold cyan]CMD Chat Client[/]", expand=False)) self.console.print() try: self.srp_authenticate() self.info("Connecting to chat...") url = f"{self.ws_url}/ws/chat?user_id={self.user_id}" async with websockets.connect(url) as ws: self.success("Connected to chat server") self.running = True receive_task = asyncio.create_task(self.receive_loop(ws)) input_task = asyncio.create_task(self.input_loop(ws)) done, pending = await asyncio.wait( [receive_task, input_task], return_when=asyncio.FIRST_COMPLETED ) for task in pending: task.cancel() self.console.print("\n[yellow]Disconnected[/]") except requests.exceptions.ConnectionError: self.error(f"Cannot connect to {self.base_url}") except requests.exceptions.HTTPError as e: self.error(f"Server error: {e.response.status_code} - {e.response.text}") except ValueError as e: self.error(f"Authentication failed: {e}") except Exception: import traceback self.error("Error occurred") traceback.print_exc() def run(self) -> None: asyncio.run(self.run_async())