180 lines
6.4 KiB
Python
180 lines
6.4 KiB
Python
"""Motion detection + clip recording for the ICEYOU whiteout.
|
|
|
|
Runs only while the whiteout is shown. Continuously samples webcam frames
|
|
into a rolling pre-buffer (default 5s). On motion: dumps the pre-buffer to
|
|
an AVI clip, then keeps recording for `post_seconds` more. Fires `on_event`
|
|
with the resulting clip path when finalized.
|
|
|
|
Uses simple frame-differencing for motion detection - cheap and effective
|
|
in a controlled indoor scene.
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
from collections import deque
|
|
from pathlib import Path
|
|
from typing import Callable, Optional
|
|
|
|
import cv2
|
|
|
|
from .utils import get_timestamp
|
|
|
|
|
|
class MotionRecorder:
|
|
def __init__(
|
|
self,
|
|
camera, # anything with read_frame() - Camera OR CameraStream
|
|
clips_dir: str = "motion_clips",
|
|
fps: int = 15,
|
|
pre_seconds: float = 5.0,
|
|
post_seconds: float = 5.0,
|
|
sensitivity: int = 2500, # min motion contour area (pixels)
|
|
cooldown_seconds: float = 8.0,
|
|
on_event: Optional[Callable[[str], None]] = None,
|
|
cam_label: str = "", # appended to clip filename (e.g. "cam1") for multi-cam
|
|
):
|
|
self.camera = camera
|
|
self.cam_label = str(cam_label or "").strip()
|
|
self.clips_dir = Path(clips_dir)
|
|
self.clips_dir.mkdir(parents=True, exist_ok=True)
|
|
self.fps = max(5, int(fps))
|
|
self.pre_seconds = max(0.5, float(pre_seconds))
|
|
self.post_seconds = max(0.5, float(post_seconds))
|
|
self.sensitivity = int(sensitivity)
|
|
self.cooldown_seconds = float(cooldown_seconds)
|
|
self.on_event = on_event or (lambda path: None)
|
|
|
|
self._buffer = deque(maxlen=int(self.fps * self.pre_seconds))
|
|
self._prev_gray = None
|
|
self._running = False
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
self._writer = None
|
|
self._writer_path: Optional[Path] = None
|
|
self._recording_until = 0.0
|
|
self._last_event_time = 0.0
|
|
|
|
@property
|
|
def active(self) -> bool:
|
|
return self._running
|
|
|
|
def start(self) -> None:
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
self._buffer.clear()
|
|
self._prev_gray = None
|
|
self._thread = threading.Thread(target=self._loop, daemon=True)
|
|
self._thread.start()
|
|
|
|
def stop(self) -> None:
|
|
if not self._running:
|
|
return
|
|
self._running = False
|
|
if self._thread:
|
|
self._thread.join(timeout=2.0)
|
|
self._thread = None
|
|
self._finalize_writer() # ensure any open clip is closed cleanly
|
|
self._buffer.clear()
|
|
self._prev_gray = None
|
|
|
|
# ---- internal ----
|
|
|
|
def _detect_motion(self, gray) -> bool:
|
|
if self._prev_gray is None or self._prev_gray.shape != gray.shape:
|
|
self._prev_gray = gray
|
|
return False
|
|
diff = cv2.absdiff(self._prev_gray, gray)
|
|
self._prev_gray = gray
|
|
_, thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
|
|
thresh = cv2.dilate(thresh, None, iterations=2)
|
|
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
for c in contours:
|
|
if cv2.contourArea(c) >= self.sensitivity:
|
|
return True
|
|
return False
|
|
|
|
def _begin_recording(self, sample_frame) -> None:
|
|
if self._writer is not None:
|
|
return
|
|
h, w = sample_frame.shape[:2]
|
|
suffix = f"_{self.cam_label}" if self.cam_label else ""
|
|
path = self.clips_dir / f"motion_{get_timestamp()}{suffix}.avi"
|
|
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
|
|
writer = cv2.VideoWriter(str(path), fourcc, self.fps, (w, h))
|
|
if not writer.isOpened():
|
|
print("[MotionRecorder] VideoWriter failed to open")
|
|
return
|
|
self._writer = writer
|
|
self._writer_path = path
|
|
# Dump pre-event buffer first so the clip starts BEFORE the motion
|
|
for buffered in list(self._buffer):
|
|
try:
|
|
self._writer.write(buffered)
|
|
except Exception:
|
|
pass
|
|
|
|
def _finalize_writer(self) -> None:
|
|
if self._writer is None:
|
|
return
|
|
try:
|
|
self._writer.release()
|
|
except Exception:
|
|
pass
|
|
path = self._writer_path
|
|
self._writer = None
|
|
self._writer_path = None
|
|
self._recording_until = 0.0
|
|
if path is not None:
|
|
try:
|
|
self.on_event(str(path))
|
|
except Exception as e:
|
|
print(f"[MotionRecorder] on_event error: {e}")
|
|
|
|
def _loop(self) -> None:
|
|
period = 1.0 / self.fps
|
|
# Small warm-up: grab a few frames before motion detection kicks in
|
|
# so the previous-frame baseline isn't blank.
|
|
warmup_until = time.time() + 0.7
|
|
|
|
while self._running:
|
|
t0 = time.time()
|
|
frame = self.camera.read_frame()
|
|
if frame is None:
|
|
time.sleep(period)
|
|
continue
|
|
|
|
self._buffer.append(frame)
|
|
|
|
try:
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
gray = cv2.GaussianBlur(gray, (21, 21), 0)
|
|
except Exception:
|
|
gray = None
|
|
|
|
now = time.time()
|
|
|
|
# If currently recording, keep writing post-event frames
|
|
if self._writer is not None:
|
|
try:
|
|
self._writer.write(frame)
|
|
except Exception:
|
|
pass
|
|
if now >= self._recording_until:
|
|
self._finalize_writer()
|
|
|
|
# Motion detection (skip during warmup + during active cooldown)
|
|
if gray is not None and now > warmup_until:
|
|
moved = self._detect_motion(gray)
|
|
if moved:
|
|
if now - self._last_event_time >= self.cooldown_seconds:
|
|
self._last_event_time = now
|
|
self._begin_recording(frame)
|
|
# Extend post-roll while motion continues
|
|
if self._writer is not None:
|
|
self._recording_until = now + self.post_seconds
|
|
|
|
elapsed = time.time() - t0
|
|
if elapsed < period:
|
|
time.sleep(period - elapsed)
|