324 lines
13 KiB
Python
324 lines
13 KiB
Python
"""DNN-based face recognition using the face_recognition library (dlib + FaceNet-style model).
|
|
|
|
This provides significantly higher accuracy than the LBPH backend, especially
|
|
against look-alikes. It requires the `face-recognition` package (which depends
|
|
on dlib).
|
|
|
|
Installation on Windows is non-trivial:
|
|
- Install Visual Studio Build Tools + CMake first.
|
|
- Then: pip install face-recognition
|
|
|
|
See README for detailed instructions.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import Optional, Tuple, List
|
|
import pickle
|
|
import numpy as np
|
|
|
|
try:
|
|
import face_recognition
|
|
FACE_RECOGNITION_AVAILABLE = True
|
|
except ImportError:
|
|
FACE_RECOGNITION_AVAILABLE = False
|
|
|
|
class DNNFaceRecognizer:
|
|
"""Face recognition using deep neural network embeddings (FaceNet style).
|
|
|
|
Much more robust than LBPH for distinguishing similar-looking people.
|
|
"""
|
|
|
|
def __init__(self, tolerance: float = 0.6, allowed_labels: Optional[List[str]] = None, faces_dir: str = "faces"):
|
|
self.tolerance = float(tolerance)
|
|
self.allowed_labels = set(allowed_labels) if allowed_labels else None
|
|
self.faces_dir = Path(faces_dir)
|
|
self.encodings_file = self.faces_dir / "face_encodings.pkl"
|
|
self.labels_file = self.faces_dir / "dnn_labels.json"
|
|
|
|
self._known_encodings: list = []
|
|
self._known_labels: list = []
|
|
self._loaded = False
|
|
self._load()
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
# Model must be loaded. dlib (face_recognition) itself only needs to be
|
|
# importable in the isolated worker subprocess, not in this process.
|
|
return self._loaded
|
|
|
|
def _load(self) -> None:
|
|
if not self.encodings_file.exists() or not self.labels_file.exists():
|
|
print("[DNNFaceRecognizer] No trained DNN model found. Run enrollment with DNN backend.")
|
|
return
|
|
|
|
try:
|
|
with open(self.encodings_file, "rb") as f:
|
|
self._known_encodings = pickle.load(f)
|
|
|
|
import json
|
|
with open(self.labels_file, "r", encoding="utf-8") as f:
|
|
self._known_labels = json.load(f)
|
|
|
|
self._loaded = True
|
|
print(f"[DNNFaceRecognizer] Loaded DNN model with {len(self._known_labels)} identities")
|
|
except Exception as e:
|
|
print(f"[DNNFaceRecognizer] Failed to load model: {e}")
|
|
|
|
def verify_frame(self, frame) -> Tuple[bool, Optional[str], Optional[float]]:
|
|
"""Verify a frame using DNN embeddings.
|
|
|
|
Returns (matched, label, distance). Lower distance = better match.
|
|
Extremely defensive to avoid crashing the Tkinter white screen thread.
|
|
"""
|
|
if not self.available or frame is None:
|
|
return False, None, None
|
|
|
|
try:
|
|
# Convert BGR to RGB
|
|
rgb_frame = frame[:, :, ::-1]
|
|
|
|
# Multiple safety checks
|
|
if rgb_frame is None or rgb_frame.size == 0:
|
|
return False, None, None
|
|
if len(rgb_frame.shape) != 3 or rgb_frame.shape[2] != 3:
|
|
return False, None, None
|
|
if rgb_frame.dtype != np.uint8:
|
|
rgb_frame = rgb_frame.astype(np.uint8)
|
|
|
|
# Detect faces
|
|
face_locations = face_recognition.face_locations(rgb_frame)
|
|
if not face_locations:
|
|
return False, None, None
|
|
|
|
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
|
|
if not face_encodings:
|
|
return False, None, None
|
|
|
|
best_label = None
|
|
best_distance = None
|
|
|
|
for encoding in face_encodings:
|
|
try:
|
|
distances = face_recognition.face_distance(self._known_encodings, encoding)
|
|
if len(distances) == 0:
|
|
continue
|
|
|
|
min_distance_idx = int(np.argmin(distances))
|
|
min_distance = float(distances[min_distance_idx])
|
|
label = self._known_labels[min_distance_idx]
|
|
|
|
if self.allowed_labels and label not in self.allowed_labels:
|
|
continue
|
|
|
|
if best_distance is None or min_distance < best_distance:
|
|
best_distance = min_distance
|
|
best_label = label
|
|
except Exception as inner_e:
|
|
print(f"[DNNFaceRecognizer] inner distance error: {inner_e}")
|
|
continue
|
|
|
|
if best_label is None or best_distance is None:
|
|
return False, None, None
|
|
|
|
matched = best_distance <= self.tolerance
|
|
return matched, best_label, best_distance
|
|
|
|
except Exception as e:
|
|
print(f"[DNNFaceRecognizer] verify_frame error: {type(e).__name__}: {e}")
|
|
import traceback
|
|
traceback.print_exc(limit=3)
|
|
return False, None, None
|
|
|
|
def verify_from_camera(self, camera, attempts: int = 3) -> Tuple[bool, Optional[str], Optional[float]]:
|
|
"""Capture several frames from ALL alive cameras and verify them in ONE
|
|
isolated subprocess.
|
|
|
|
Frames are captured here in the (safe) calling process, then the dlib
|
|
matching is delegated to a single subprocess so that (a) a native dlib
|
|
crash can never take down the main ICEYOU app, and (b) the model is
|
|
loaded only once per attempt (fast -> button re-enables quickly).
|
|
|
|
IMPORTANT: we sample EVERY working camera (not just the primary). On
|
|
multi-cam rigs the user-facing camera is often not index 0, so reading
|
|
only the primary stream caused face unlock to never see the user.
|
|
"""
|
|
if not self.available:
|
|
return False, None, None
|
|
|
|
import cv2
|
|
import tempfile
|
|
import os
|
|
|
|
rounds = max(1, attempts)
|
|
tmp_paths: list = []
|
|
try:
|
|
captured = 0
|
|
|
|
def _save(frame) -> None:
|
|
nonlocal captured
|
|
if frame is None:
|
|
return
|
|
fd, tmp_path = tempfile.mkstemp(suffix=".png", prefix="iceyou_dnn_")
|
|
os.close(fd)
|
|
if cv2.imwrite(tmp_path, frame):
|
|
tmp_paths.append(tmp_path)
|
|
captured += 1
|
|
else:
|
|
try:
|
|
os.remove(tmp_path)
|
|
except Exception:
|
|
pass
|
|
|
|
# Prefer per-camera capture so we sample every working stream.
|
|
read_all = getattr(camera, "read_all_frames", None)
|
|
for _ in range(rounds):
|
|
if callable(read_all):
|
|
frames = read_all() or {}
|
|
for _idx, frame in frames.items():
|
|
_save(frame)
|
|
else:
|
|
_save(camera.read_frame())
|
|
|
|
cam_count = 0
|
|
try:
|
|
cam_count = len(getattr(camera, "alive_streams", []) or [])
|
|
except Exception:
|
|
cam_count = 0
|
|
print(f"[DNNFaceRecognizer] Captured {captured} frame(s) across {cam_count or 1} camera(s) x {rounds} round(s) for face unlock")
|
|
if not tmp_paths:
|
|
print("[DNNFaceRecognizer] No usable camera frames; face unlock will report no match")
|
|
return False, None, None
|
|
return self._verify_images_subprocess(tmp_paths)
|
|
finally:
|
|
for p in tmp_paths:
|
|
try:
|
|
os.remove(p)
|
|
except Exception:
|
|
pass
|
|
|
|
def _verify_images_subprocess(self, image_paths: list) -> Tuple[bool, Optional[str], Optional[float]]:
|
|
"""Run dlib matching over the given images in a single subprocess.
|
|
|
|
Returns (matched, label, distance). On ANY failure (including a native
|
|
subprocess crash / non-zero exit) returns a safe no-match instead of
|
|
propagating, so the parent process never dies.
|
|
"""
|
|
import subprocess
|
|
import os
|
|
import sys
|
|
|
|
try:
|
|
allowed_csv = ",".join(sorted(self.allowed_labels)) if self.allowed_labels else ""
|
|
print(f"[DNNFaceRecognizer] Launching isolated DNN worker for {len(image_paths)} frame(s) (face recognition in subprocess)...")
|
|
|
|
cmd = [
|
|
sys.executable,
|
|
"-m", "iceyou.dnn_verify_worker",
|
|
"--encodings", str(self.encodings_file),
|
|
"--labels", str(self.labels_file),
|
|
"--tolerance", str(self.tolerance),
|
|
]
|
|
if allowed_csv:
|
|
cmd += ["--allowed", allowed_csv]
|
|
cmd += list(image_paths)
|
|
|
|
# Ensure the worker can import the iceyou package
|
|
env = dict(os.environ)
|
|
src_dir = str(Path(__file__).resolve().parent.parent)
|
|
existing = env.get("PYTHONPATH", "")
|
|
env["PYTHONPATH"] = src_dir + (os.pathsep + existing if existing else "")
|
|
|
|
# Suppress the console window flash on Windows
|
|
creationflags = 0
|
|
if os.name == "nt":
|
|
creationflags = getattr(subprocess, "CREATE_NO_WINDOW", 0)
|
|
|
|
proc = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
env=env,
|
|
creationflags=creationflags,
|
|
)
|
|
|
|
if proc.returncode != 0:
|
|
# Native crash / segfault in dlib -> isolated, parent survives
|
|
print(f"[DNNFaceRecognizer] worker exited with code {proc.returncode} (isolated, app safe)")
|
|
if proc.stderr:
|
|
print(f"[DNNFaceRecognizer] worker stderr: {proc.stderr.strip()[:300]}")
|
|
return False, None, None
|
|
|
|
# Always surface worker diagnostics (face counts, closest distance)
|
|
if proc.stderr and proc.stderr.strip():
|
|
for line in proc.stderr.strip().splitlines():
|
|
print(f"[DNNFaceRecognizer] {line.strip()}")
|
|
|
|
out = (proc.stdout or "").strip().splitlines()
|
|
if not out:
|
|
return False, None, None
|
|
|
|
import json
|
|
result = json.loads(out[-1])
|
|
matched = bool(result.get("matched", False))
|
|
label = result.get("label")
|
|
distance = result.get("distance")
|
|
if matched:
|
|
print(f"[DNNFaceRecognizer] MATCH: label={label} distance={distance:.3f} (tolerance={self.tolerance:.3f})")
|
|
elif distance is not None:
|
|
print(f"[DNNFaceRecognizer] Worker result: best_distance={distance:.3f} (tolerance={self.tolerance:.3f}) — no match")
|
|
else:
|
|
print("[DNNFaceRecognizer] Worker result: no allowed face match (see diagnostics above)")
|
|
return matched, label, distance
|
|
|
|
except subprocess.TimeoutExpired:
|
|
print("[DNNFaceRecognizer] worker timed out (isolated, app safe)")
|
|
return False, None, None
|
|
except Exception as e:
|
|
print(f"[DNNFaceRecognizer] subprocess verify error: {type(e).__name__}: {e}")
|
|
return False, None, None
|
|
|
|
|
|
def train_dnn_model(faces_dir: str = "faces") -> bool:
|
|
"""Train a DNN model from images in faces/<label>/ directories.
|
|
This is a helper function for future enrollment script updates.
|
|
"""
|
|
if not FACE_RECOGNITION_AVAILABLE:
|
|
print("face_recognition not available. Cannot train DNN model.")
|
|
return False
|
|
|
|
faces_path = Path(faces_dir)
|
|
encodings = []
|
|
labels = []
|
|
|
|
for label_dir in faces_path.iterdir():
|
|
if not label_dir.is_dir():
|
|
continue
|
|
label = label_dir.name
|
|
for img_path in label_dir.glob("*.png"):
|
|
try:
|
|
image = face_recognition.load_image_file(str(img_path))
|
|
face_encodings = face_recognition.face_encodings(image)
|
|
if face_encodings:
|
|
encodings.append(face_encodings[0])
|
|
labels.append(label)
|
|
except Exception as e:
|
|
print(f"Failed to process {img_path}: {e}")
|
|
|
|
if not encodings:
|
|
print("No valid face encodings found.")
|
|
return False
|
|
|
|
encodings_file = faces_path / "face_encodings.pkl"
|
|
labels_file = faces_path / "dnn_labels.json"
|
|
|
|
with open(encodings_file, "wb") as f:
|
|
pickle.dump(encodings, f)
|
|
|
|
import json
|
|
with open(labels_file, "w", encoding="utf-8") as f:
|
|
json.dump(labels, f, indent=2)
|
|
|
|
print(f"Trained DNN model with {len(labels)} images across {len(set(labels))} identities.")
|
|
return True
|