ICEYOU/src/iceyou/actions.py

175 lines
6.9 KiB
Python

"""Action handlers triggered on intrusion detection: email, lock, etc."""
import smtplib
import ssl
import json
from email.message import EmailMessage
from pathlib import Path
from typing import Optional
import threading
from .config import Config
from .utils import lock_workstation, get_timestamp
class Actions:
def __init__(self, config: Config, monitor_ref=None):
self.config = config
self.monitor = monitor_ref # for events.log writing via monitor._log_event
# ---------- public entry points ----------
def handle_intrusion(self, reason: str, snapshot_path: Optional[str] = None,
snapshot_paths: Optional[list] = None) -> None:
"""Main entry point called by Monitor on intrusion.
Note: does NOT lock the workstation. OS lock is reserved for repeated
failed unlock attempts and is handled by the tray unlock dialog.
`snapshot_paths` (when supplied by multi-cam Monitor) attaches every
captured frame; `snapshot_path` is the legacy single-attachment path.
"""
subject = reason
body = (
"ICEYOU detected potential unauthorized access.\n\n"
f"Reason: {reason}\n"
f"Time: {get_timestamp()}\n\n"
"Check attached snapshot(s) (if available) and logs."
)
attachments = list(snapshot_paths) if snapshot_paths else (
[snapshot_path] if snapshot_path else []
)
self.send_alert(subject, body, attachments)
def handle_unlock_attempt(self, record: dict) -> None:
"""Email an alert for an interactive unlock attempt (password or face).
Includes the entered credential text on failed password attempts so
you can review what the intruder typed.
Supports both `record["snapshot"]` (single path, back-compat) and
`record["snapshots"]` (list of paths from multi-cam capture).
"""
method = record.get("method", "?")
success = bool(record.get("success", False))
snaps = record.get("snapshots")
if not snaps:
snap = record.get("snapshot")
snaps = [snap] if snap else []
if success and method == "password":
# Don't send/save the real owner password.
subject = f"Unlock success (password) at {get_timestamp()}"
body = (
"ICEYOU was unlocked successfully with the correct password.\n"
f"Time: {get_timestamp()}\n"
"Snapshot attached if available."
)
elif success and method == "face":
subject = (
f"Unlock success (face: {record.get('label')}, conf={record.get('confidence')}) "
f"at {get_timestamp()}"
)
body = (
"ICEYOU was unlocked by face recognition.\n"
f"Label: {record.get('label')}\n"
f"Confidence: {record.get('confidence')}\n"
f"Time: {get_timestamp()}\n"
)
elif method == "password":
entered = record.get("entered", "")
subject = f"FAILED unlock attempt (password) at {get_timestamp()}"
body = (
"ICEYOU recorded a FAILED password unlock attempt.\n\n"
f"Entered: {entered!r}\n"
f"Time: {get_timestamp()}\n\n"
"Snapshot of the person at the keyboard is attached."
)
else: # failed face
subject = f"FAILED unlock attempt (face) at {get_timestamp()}"
body = (
"ICEYOU recorded a FAILED face unlock attempt.\n\n"
f"Predicted label: {record.get('label')}\n"
f"Confidence: {record.get('confidence')}\n"
f"Time: {get_timestamp()}\n"
)
self.send_alert(subject, body, snaps)
def send_test_email(self) -> None:
self.send_alert(
"Test email",
f"This is a test from ICEYOU at {get_timestamp()}. If you see this, SMTP works.",
None,
)
# ---------- internal ----------
def send_alert(self, subject: str, body: str, attachments) -> None:
"""`attachments` may be None, a single path string, or a list of paths."""
if not self.config.get("email.enabled", False):
return
if attachments is None:
paths = []
elif isinstance(attachments, (list, tuple)):
paths = [p for p in attachments if p]
else:
paths = [attachments]
threading.Thread(
target=self._send_email,
args=(subject, body, paths),
daemon=True,
).start()
def _log(self, event: str, details: str) -> None:
if self.monitor is not None and hasattr(self.monitor, "_log_event"):
try:
self.monitor._log_event(event, details)
return
except Exception:
pass
print(f"[Actions] {event}: {details}")
def _send_email(self, subject: str, body: str, snapshot_paths) -> None:
email_cfg = self.config.get("email", {})
prefix = email_cfg.get("subject_prefix", "[ICEYOU Alert]")
# Normalize attachment paths to a list
if snapshot_paths is None:
paths = []
elif isinstance(snapshot_paths, (list, tuple)):
paths = list(snapshot_paths)
else:
paths = [snapshot_paths]
try:
msg = EmailMessage()
msg["Subject"] = f"{prefix} {subject}"
msg["From"] = email_cfg.get("from_addr", "")
msg["To"] = email_cfg.get("to_addr", "")
msg.set_content(body)
for p in paths:
if not p:
continue
pp = Path(p)
if not pp.exists():
continue
with open(pp, "rb") as f:
img_data = f.read()
msg.add_attachment(
img_data,
maintype="image",
subtype="jpeg",
filename=pp.name,
)
context = ssl.create_default_context()
host = email_cfg.get("smtp_server", "smtp.gmail.com")
port = email_cfg.get("smtp_port", 587)
with smtplib.SMTP(host, port, timeout=20) as server:
server.ehlo()
if email_cfg.get("use_tls", True):
server.starttls(context=context)
server.ehlo()
server.login(email_cfg.get("username", ""), email_cfg.get("password", ""))
server.send_message(msg)
self._log("email_sent", f"{subject} (atts={len(paths)})")
except Exception as e:
self._log("email_failed", f"{type(e).__name__}: {e}")