205 lines
6.7 KiB
Python
205 lines
6.7 KiB
Python
import asyncio
|
|
import json
|
|
import base64
|
|
from typing import Optional
|
|
|
|
import srp
|
|
import requests
|
|
from cryptography.fernet import Fernet
|
|
import websockets
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
|
|
srp.rfc5054_enable()
|
|
|
|
|
|
class Client:
|
|
def __init__(
|
|
self, server: str, port: int, username: str, password: Optional[str] = None
|
|
):
|
|
self.server = server
|
|
self.port = port
|
|
self.username = username
|
|
self.password = (password or "").encode()
|
|
self.user_id: Optional[str] = None
|
|
self.fernet: Optional[Fernet] = None
|
|
|
|
self.console = Console()
|
|
self.messages: list[dict] = []
|
|
self.users: list[dict] = []
|
|
self.connected = False
|
|
self.running = False
|
|
|
|
@property
|
|
def base_url(self) -> str:
|
|
return f"http://{self.server}:{self.port}"
|
|
|
|
@property
|
|
def ws_url(self) -> str:
|
|
return f"ws://{self.server}:{self.port}"
|
|
|
|
def success(self, message: str) -> None:
|
|
self.console.print(f"[green]✓ {message}[/]")
|
|
|
|
def error(self, message: str) -> None:
|
|
self.console.print(f"[red]✗ {message}[/]")
|
|
|
|
def info(self, message: str) -> None:
|
|
self.console.print(f"[cyan]• {message}[/]")
|
|
|
|
def srp_authenticate(self) -> None:
|
|
with self.console.status("[cyan]Starting SRP handshake...[/]", spinner="dots"):
|
|
|
|
usr = srp.User(b"chat", self.password, hash_alg=srp.SHA256)
|
|
_, A = usr.start_authentication()
|
|
|
|
resp = requests.post(
|
|
f"{self.base_url}/srp/init",
|
|
json={
|
|
"username": self.username,
|
|
"A": base64.b64encode(A).decode(),
|
|
},
|
|
timeout=30,
|
|
)
|
|
resp.raise_for_status()
|
|
init_data = resp.json()
|
|
|
|
self.user_id = init_data["user_id"]
|
|
B = base64.b64decode(init_data["B"])
|
|
salt = base64.b64decode(init_data["salt"])
|
|
|
|
M = usr.process_challenge(salt, B)
|
|
|
|
if M is None:
|
|
raise ValueError("SRP challenge processing failed")
|
|
|
|
resp = requests.post(
|
|
f"{self.base_url}/srp/verify",
|
|
json={
|
|
"user_id": self.user_id,
|
|
"username": self.username,
|
|
"M": base64.b64encode(M).decode(),
|
|
},
|
|
timeout=30,
|
|
)
|
|
resp.raise_for_status()
|
|
verify_data = resp.json()
|
|
|
|
H_AMK = base64.b64decode(verify_data["H_AMK"])
|
|
usr.verify_session(H_AMK)
|
|
|
|
if not usr.authenticated():
|
|
raise ValueError("Server authentication failed")
|
|
|
|
session_key = base64.b64decode(verify_data["session_key"])
|
|
self.fernet = Fernet(session_key)
|
|
|
|
self.success(f"SRP authenticated (session: {self.user_id[:8]}...)")
|
|
|
|
def render_messages(self) -> None:
|
|
self.console.clear()
|
|
|
|
users_online = ", ".join(u.get("username", "?") for u in self.users) or "none"
|
|
self.console.print(f"[dim]Online: {users_online}[/]")
|
|
self.console.print("─" * 60)
|
|
|
|
display_messages = (
|
|
self.messages[-15:] if len(self.messages) > 15 else self.messages
|
|
)
|
|
|
|
for msg in display_messages:
|
|
username = msg.get("username", "unknown")
|
|
text = msg.get("text", "")
|
|
timestamp = str(msg.get("timestamp", ""))[:19].replace("T", " ")
|
|
|
|
style = "green" if username == self.username else "cyan"
|
|
self.console.print(f"[dim]{timestamp}[/] [{style}]{username}[/]: {text}")
|
|
|
|
if not display_messages:
|
|
self.console.print("[dim italic]No messages yet...[/]")
|
|
|
|
self.console.print("─" * 60)
|
|
self.console.print("[dim]Type message and press Enter. 'q' to quit.[/]")
|
|
|
|
async def receive_loop(self, ws) -> None:
|
|
try:
|
|
async for raw in ws:
|
|
if not self.running:
|
|
break
|
|
|
|
data = json.loads(raw)
|
|
msg_type = data.get("type", "")
|
|
|
|
if msg_type == "init":
|
|
self.messages = data.get("messages", [])
|
|
self.users = data.get("users", [])
|
|
self.connected = True
|
|
self.render_messages()
|
|
elif msg_type == "message":
|
|
msg_data = data.get("data", {})
|
|
self.messages.append(msg_data)
|
|
self.render_messages()
|
|
elif msg_type == "user_left":
|
|
left_id = data.get("user_id")
|
|
self.users = [u for u in self.users if u.get("user_id") != left_id]
|
|
self.render_messages()
|
|
|
|
except websockets.ConnectionClosed:
|
|
self.connected = False
|
|
|
|
async def input_loop(self, ws) -> None:
|
|
loop = asyncio.get_event_loop()
|
|
while self.running:
|
|
try:
|
|
text = await loop.run_in_executor(None, input)
|
|
if text.lower() in ("q", "quit", "exit"):
|
|
self.running = False
|
|
break
|
|
if text.strip():
|
|
await ws.send(text)
|
|
except (EOFError, KeyboardInterrupt):
|
|
self.running = False
|
|
break
|
|
|
|
async def run_async(self) -> None:
|
|
self.console.clear()
|
|
self.console.print(Panel("[bold cyan]CMD Chat Client[/]", expand=False))
|
|
self.console.print()
|
|
|
|
try:
|
|
self.srp_authenticate()
|
|
|
|
self.info("Connecting to chat...")
|
|
url = f"{self.ws_url}/ws/chat?user_id={self.user_id}"
|
|
|
|
async with websockets.connect(url) as ws:
|
|
self.success("Connected to chat server")
|
|
self.running = True
|
|
|
|
receive_task = asyncio.create_task(self.receive_loop(ws))
|
|
input_task = asyncio.create_task(self.input_loop(ws))
|
|
|
|
done, pending = await asyncio.wait(
|
|
[receive_task, input_task], return_when=asyncio.FIRST_COMPLETED
|
|
)
|
|
|
|
for task in pending:
|
|
task.cancel()
|
|
|
|
self.console.print("\n[yellow]Disconnected[/]")
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
self.error(f"Cannot connect to {self.base_url}")
|
|
except requests.exceptions.HTTPError as e:
|
|
self.error(f"Server error: {e.response.status_code} - {e.response.text}")
|
|
except ValueError as e:
|
|
self.error(f"Authentication failed: {e}")
|
|
except Exception:
|
|
import traceback
|
|
|
|
self.error("Error occurred")
|
|
traceback.print_exc()
|
|
|
|
def run(self) -> None:
|
|
asyncio.run(self.run_async())
|