ICEYOU/src/iceyou/motion_recorder.py

180 lines
6.4 KiB
Python

"""Motion detection + clip recording for the ICEYOU whiteout.
Runs only while the whiteout is shown. Continuously samples webcam frames
into a rolling pre-buffer (default 5s). On motion: dumps the pre-buffer to
an AVI clip, then keeps recording for `post_seconds` more. Fires `on_event`
with the resulting clip path when finalized.
Uses simple frame-differencing for motion detection - cheap and effective
in a controlled indoor scene.
"""
import time
import threading
from collections import deque
from pathlib import Path
from typing import Callable, Optional
import cv2
from .utils import get_timestamp
class MotionRecorder:
def __init__(
self,
camera, # anything with read_frame() - Camera OR CameraStream
clips_dir: str = "motion_clips",
fps: int = 15,
pre_seconds: float = 5.0,
post_seconds: float = 5.0,
sensitivity: int = 2500, # min motion contour area (pixels)
cooldown_seconds: float = 8.0,
on_event: Optional[Callable[[str], None]] = None,
cam_label: str = "", # appended to clip filename (e.g. "cam1") for multi-cam
):
self.camera = camera
self.cam_label = str(cam_label or "").strip()
self.clips_dir = Path(clips_dir)
self.clips_dir.mkdir(parents=True, exist_ok=True)
self.fps = max(5, int(fps))
self.pre_seconds = max(0.5, float(pre_seconds))
self.post_seconds = max(0.5, float(post_seconds))
self.sensitivity = int(sensitivity)
self.cooldown_seconds = float(cooldown_seconds)
self.on_event = on_event or (lambda path: None)
self._buffer = deque(maxlen=int(self.fps * self.pre_seconds))
self._prev_gray = None
self._running = False
self._thread: Optional[threading.Thread] = None
self._writer = None
self._writer_path: Optional[Path] = None
self._recording_until = 0.0
self._last_event_time = 0.0
@property
def active(self) -> bool:
return self._running
def start(self) -> None:
if self._running:
return
self._running = True
self._buffer.clear()
self._prev_gray = None
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def stop(self) -> None:
if not self._running:
return
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
self._thread = None
self._finalize_writer() # ensure any open clip is closed cleanly
self._buffer.clear()
self._prev_gray = None
# ---- internal ----
def _detect_motion(self, gray) -> bool:
if self._prev_gray is None or self._prev_gray.shape != gray.shape:
self._prev_gray = gray
return False
diff = cv2.absdiff(self._prev_gray, gray)
self._prev_gray = gray
_, thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
thresh = cv2.dilate(thresh, None, iterations=2)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for c in contours:
if cv2.contourArea(c) >= self.sensitivity:
return True
return False
def _begin_recording(self, sample_frame) -> None:
if self._writer is not None:
return
h, w = sample_frame.shape[:2]
suffix = f"_{self.cam_label}" if self.cam_label else ""
path = self.clips_dir / f"motion_{get_timestamp()}{suffix}.avi"
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
writer = cv2.VideoWriter(str(path), fourcc, self.fps, (w, h))
if not writer.isOpened():
print("[MotionRecorder] VideoWriter failed to open")
return
self._writer = writer
self._writer_path = path
# Dump pre-event buffer first so the clip starts BEFORE the motion
for buffered in list(self._buffer):
try:
self._writer.write(buffered)
except Exception:
pass
def _finalize_writer(self) -> None:
if self._writer is None:
return
try:
self._writer.release()
except Exception:
pass
path = self._writer_path
self._writer = None
self._writer_path = None
self._recording_until = 0.0
if path is not None:
try:
self.on_event(str(path))
except Exception as e:
print(f"[MotionRecorder] on_event error: {e}")
def _loop(self) -> None:
period = 1.0 / self.fps
# Small warm-up: grab a few frames before motion detection kicks in
# so the previous-frame baseline isn't blank.
warmup_until = time.time() + 0.7
while self._running:
t0 = time.time()
frame = self.camera.read_frame()
if frame is None:
time.sleep(period)
continue
self._buffer.append(frame)
try:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
except Exception:
gray = None
now = time.time()
# If currently recording, keep writing post-event frames
if self._writer is not None:
try:
self._writer.write(frame)
except Exception:
pass
if now >= self._recording_until:
self._finalize_writer()
# Motion detection (skip during warmup + during active cooldown)
if gray is not None and now > warmup_until:
moved = self._detect_motion(gray)
if moved:
if now - self._last_event_time >= self.cooldown_seconds:
self._last_event_time = now
self._begin_recording(frame)
# Extend post-roll while motion continues
if self._writer is not None:
self._recording_until = now + self.post_seconds
elapsed = time.time() - t0
if elapsed < period:
time.sleep(period - elapsed)