Extract video helpers; dedup camera/window loops

This commit is contained in:
ck-zhang
2025-05-02 00:00:53 +08:00
parent d15e4c9684
commit 18af4bc4d3
3 changed files with 138 additions and 126 deletions

View File

@@ -3,6 +3,7 @@ import cv2
import numpy as np
import os
from eyetrax.utils.screen import get_screen_size
from eyetrax.utils.video import camera, fullscreen, iter_frames
from eyetrax.gaze import GazeEstimator
from eyetrax.calibration import (
run_9_point_calibration,
@@ -51,10 +52,6 @@ def run_demo():
kalman = None
smoother = NoSmoother()
cam_width, cam_height = 320, 240
BORDER = 2
MARGIN = 20
if background_path and os.path.isfile(background_path):
background = cv2.imread(background_path)
background = cv2.resize(background, (screen_width, screen_height))
@@ -62,96 +59,91 @@ def run_demo():
background = np.zeros((screen_height, screen_width, 3), dtype=np.uint8)
background[:] = (50, 50, 50)
cv2.namedWindow("Gaze Estimation", cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty(
"Gaze Estimation", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN
)
cap = cv2.VideoCapture(camera_index)
prev_time = time.time()
cam_width, cam_height = 320, 240
BORDER = 2
MARGIN = 20
cursor_alpha = 0.0
cursor_step = 0.05
while True:
ret, frame = cap.read()
if not ret:
continue
with camera(camera_index) as cap, fullscreen("Gaze Estimation"):
prev_time = time.time()
features, blink_detected = gaze_estimator.extract_features(frame)
if features is not None and not blink_detected:
gaze_point = gaze_estimator.predict(np.array([features]))[0]
x, y = map(int, gaze_point)
for frame in iter_frames(cap):
features, blink_detected = gaze_estimator.extract_features(frame)
x_pred, y_pred = smoother.step(x, y)
contours = smoother.debug.get("contours", [])
if features is not None and not blink_detected:
gaze_point = gaze_estimator.predict(np.array([features]))[0]
x, y = map(int, gaze_point)
x_pred, y_pred = smoother.step(x, y)
contours = smoother.debug.get("contours", [])
cursor_alpha = min(cursor_alpha + cursor_step, 1.0)
else:
x_pred = y_pred = None
blink_detected = True
contours = []
cursor_alpha = max(cursor_alpha - cursor_step, 0.0)
cursor_alpha = min(cursor_alpha + cursor_step, 1.0)
else:
x_pred = y_pred = None
blink_detected = True
contours = []
cursor_alpha = max(cursor_alpha - cursor_step, 0.0)
canvas = background.copy()
canvas = background.copy()
if filter_method == "kde" and contours:
cv2.drawContours(canvas, contours, -1, (15, 182, 242), 5)
if filter_method == "kde" and contours:
cv2.drawContours(canvas, contours, -1, (15, 182, 242), 5)
if x_pred is not None and y_pred is not None and cursor_alpha > 0:
overlay = canvas.copy()
cv2.circle(overlay, (x_pred, y_pred), 30, (0, 0, 255), -1)
cv2.circle(overlay, (x_pred, y_pred), 25, (255, 255, 255), -1)
cv2.addWeighted(
overlay,
cursor_alpha * 0.6,
canvas,
1 - cursor_alpha * 0.6,
0,
canvas,
)
if x_pred is not None and y_pred is not None and cursor_alpha > 0:
overlay = canvas.copy()
cv2.circle(overlay, (x_pred, y_pred), 30, (0, 0, 255), -1)
cv2.circle(overlay, (x_pred, y_pred), 25, (255, 255, 255), -1)
cv2.addWeighted(
overlay, cursor_alpha * 0.6, canvas, 1 - cursor_alpha * 0.6, 0, canvas
small = cv2.resize(frame, (cam_width, cam_height))
thumb = cv2.copyMakeBorder(
small,
BORDER,
BORDER,
BORDER,
BORDER,
cv2.BORDER_CONSTANT,
value=(255, 255, 255),
)
h, w = thumb.shape[:2]
canvas[-h - MARGIN : -MARGIN, -w - MARGIN : -MARGIN] = thumb
now = time.time()
fps = 1 / (now - prev_time)
prev_time = now
cv2.putText(
canvas,
f"FPS: {int(fps)}",
(50, 50),
cv2.FONT_HERSHEY_SIMPLEX,
1.2,
(255, 255, 255),
2,
cv2.LINE_AA,
)
blink_txt = "Blinking" if blink_detected else "Not Blinking"
blink_clr = (0, 0, 255) if blink_detected else (0, 255, 0)
cv2.putText(
canvas,
blink_txt,
(50, 100),
cv2.FONT_HERSHEY_SIMPLEX,
1.2,
blink_clr,
2,
cv2.LINE_AA,
)
small = cv2.resize(frame, (cam_width, cam_height))
thumb = cv2.copyMakeBorder(
small,
BORDER,
BORDER,
BORDER,
BORDER,
cv2.BORDER_CONSTANT,
value=(255, 255, 255),
)
h, w = thumb.shape[:2]
canvas[-h - MARGIN : -MARGIN, -w - MARGIN : -MARGIN] = thumb
now = time.time()
fps = 1 / (now - prev_time)
prev_time = now
cv2.putText(
canvas,
f"FPS: {int(fps)}",
(50, 50),
cv2.FONT_HERSHEY_SIMPLEX,
1.2,
(255, 255, 255),
2,
cv2.LINE_AA,
)
blink_txt = "Blinking" if blink_detected else "Not Blinking"
blink_clr = (0, 0, 255) if blink_detected else (0, 255, 0)
cv2.putText(
canvas,
blink_txt,
(50, 100),
cv2.FONT_HERSHEY_SIMPLEX,
1.2,
blink_clr,
2,
cv2.LINE_AA,
)
cv2.imshow("Gaze Estimation", canvas)
if cv2.waitKey(1) == 27:
break
cap.release()
cv2.destroyAllWindows()
cv2.imshow("Gaze Estimation", canvas)
if cv2.waitKey(1) == 27:
break
if __name__ == "__main__":

View File

@@ -1,9 +1,9 @@
import argparse
import time
import cv2
import numpy as np
import pyvirtualcam
from eyetrax.utils.screen import get_screen_size
from eyetrax.utils.video import camera, iter_frames
from eyetrax.gaze import GazeEstimator
from eyetrax.calibration import (
run_9_point_calibration,
@@ -51,53 +51,38 @@ def run_virtualcam():
kalman = None
smoother = NoSmoother()
cap = cv2.VideoCapture(camera_index)
if not cap.isOpened():
print("Error: cannot open camera.")
return
cam_fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30
green_bg = np.zeros((screen_height, screen_width, 3), dtype=np.uint8)
green_bg[:] = (0, 255, 0)
with pyvirtualcam.Camera(
width=screen_width,
height=screen_height,
fps=cam_fps,
fmt=pyvirtualcam.PixelFormat.BGR,
) as cam:
print(f"Virtual camera started: {cam.device}")
with camera(camera_index) as cap:
cam_fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30
with pyvirtualcam.Camera(
width=screen_width,
height=screen_height,
fps=cam_fps,
fmt=pyvirtualcam.PixelFormat.BGR,
) as cam:
print(f"Virtual camera started: {cam.device}")
for frame in iter_frames(cap):
features, blink_detected = gaze_estimator.extract_features(frame)
while True:
ret, frame = cap.read()
if not ret:
continue
if features is not None and not blink_detected:
gaze_point = gaze_estimator.predict(np.array([features]))[0]
x, y = map(int, gaze_point)
x_pred, y_pred = smoother.step(x, y)
contours = smoother.debug.get("contours", [])
else:
x_pred = y_pred = None
contours = []
features, blink_detected = gaze_estimator.extract_features(frame)
output = green_bg.copy()
if contours:
cv2.drawContours(output, contours, -1, (0, 0, 255), 3)
if x_pred is not None and y_pred is not None:
cv2.circle(output, (x_pred, y_pred), 10, (0, 0, 255), -1)
if features is not None and not blink_detected:
gaze_point = gaze_estimator.predict(np.array([features]))[0]
x, y = map(int, gaze_point)
x_pred, y_pred = smoother.step(x, y)
contours = smoother.debug.get("contours", [])
else:
x_pred = y_pred = None
contours = []
output = green_bg.copy()
if contours:
cv2.drawContours(output, contours, -1, (0, 0, 255), 3)
if x_pred is not None and y_pred is not None:
cv2.circle(output, (x_pred, y_pred), 10, (0, 0, 255), -1)
cam.send(output)
cam.sleep_until_next_frame()
cap.release()
cv2.destroyAllWindows()
cam.send(output)
cam.sleep_until_next_frame()
if __name__ == "__main__":

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from contextlib import contextmanager
import cv2
@contextmanager
def fullscreen(name: str):
"""Open a window in full-screen mode"""
cv2.namedWindow(name, cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty(name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
try:
yield
finally:
cv2.destroyWindow(name)
@contextmanager
def camera(index: int = 0):
"""Context manager returning an opened VideoCapture"""
cap = cv2.VideoCapture(index)
if not cap.isOpened():
raise RuntimeError(f"cannot open camera {index}")
try:
yield cap
finally:
cap.release()
def iter_frames(cap: cv2.VideoCapture):
"""Infinite generator yielding successive frames"""
while True:
ok, frame = cap.read()
if not ok:
continue
yield frame