diff --git a/src/eyetrax/app/demo.py b/src/eyetrax/app/demo.py index 7949855..e31e506 100644 --- a/src/eyetrax/app/demo.py +++ b/src/eyetrax/app/demo.py @@ -3,6 +3,7 @@ import cv2 import numpy as np import os from eyetrax.utils.screen import get_screen_size +from eyetrax.utils.video import camera, fullscreen, iter_frames from eyetrax.gaze import GazeEstimator from eyetrax.calibration import ( run_9_point_calibration, @@ -51,10 +52,6 @@ def run_demo(): kalman = None smoother = NoSmoother() - cam_width, cam_height = 320, 240 - BORDER = 2 - MARGIN = 20 - if background_path and os.path.isfile(background_path): background = cv2.imread(background_path) background = cv2.resize(background, (screen_width, screen_height)) @@ -62,96 +59,91 @@ def run_demo(): background = np.zeros((screen_height, screen_width, 3), dtype=np.uint8) background[:] = (50, 50, 50) - cv2.namedWindow("Gaze Estimation", cv2.WND_PROP_FULLSCREEN) - cv2.setWindowProperty( - "Gaze Estimation", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN - ) - - cap = cv2.VideoCapture(camera_index) - prev_time = time.time() - + cam_width, cam_height = 320, 240 + BORDER = 2 + MARGIN = 20 cursor_alpha = 0.0 cursor_step = 0.05 - while True: - ret, frame = cap.read() - if not ret: - continue + with camera(camera_index) as cap, fullscreen("Gaze Estimation"): + prev_time = time.time() - features, blink_detected = gaze_estimator.extract_features(frame) - if features is not None and not blink_detected: - gaze_point = gaze_estimator.predict(np.array([features]))[0] - x, y = map(int, gaze_point) + for frame in iter_frames(cap): + features, blink_detected = gaze_estimator.extract_features(frame) - x_pred, y_pred = smoother.step(x, y) - contours = smoother.debug.get("contours", []) + if features is not None and not blink_detected: + gaze_point = gaze_estimator.predict(np.array([features]))[0] + x, y = map(int, gaze_point) + x_pred, y_pred = smoother.step(x, y) + contours = smoother.debug.get("contours", []) + cursor_alpha = min(cursor_alpha + cursor_step, 1.0) + else: + x_pred = y_pred = None + blink_detected = True + contours = [] + cursor_alpha = max(cursor_alpha - cursor_step, 0.0) - cursor_alpha = min(cursor_alpha + cursor_step, 1.0) - else: - x_pred = y_pred = None - blink_detected = True - contours = [] - cursor_alpha = max(cursor_alpha - cursor_step, 0.0) + canvas = background.copy() - canvas = background.copy() + if filter_method == "kde" and contours: + cv2.drawContours(canvas, contours, -1, (15, 182, 242), 5) - if filter_method == "kde" and contours: - cv2.drawContours(canvas, contours, -1, (15, 182, 242), 5) + if x_pred is not None and y_pred is not None and cursor_alpha > 0: + overlay = canvas.copy() + cv2.circle(overlay, (x_pred, y_pred), 30, (0, 0, 255), -1) + cv2.circle(overlay, (x_pred, y_pred), 25, (255, 255, 255), -1) + cv2.addWeighted( + overlay, + cursor_alpha * 0.6, + canvas, + 1 - cursor_alpha * 0.6, + 0, + canvas, + ) - if x_pred is not None and y_pred is not None and cursor_alpha > 0: - overlay = canvas.copy() - cv2.circle(overlay, (x_pred, y_pred), 30, (0, 0, 255), -1) - cv2.circle(overlay, (x_pred, y_pred), 25, (255, 255, 255), -1) - cv2.addWeighted( - overlay, cursor_alpha * 0.6, canvas, 1 - cursor_alpha * 0.6, 0, canvas + small = cv2.resize(frame, (cam_width, cam_height)) + thumb = cv2.copyMakeBorder( + small, + BORDER, + BORDER, + BORDER, + BORDER, + cv2.BORDER_CONSTANT, + value=(255, 255, 255), + ) + h, w = thumb.shape[:2] + canvas[-h - MARGIN : -MARGIN, -w - MARGIN : -MARGIN] = thumb + + now = time.time() + fps = 1 / (now - prev_time) + prev_time = now + + cv2.putText( + canvas, + f"FPS: {int(fps)}", + (50, 50), + cv2.FONT_HERSHEY_SIMPLEX, + 1.2, + (255, 255, 255), + 2, + cv2.LINE_AA, + ) + blink_txt = "Blinking" if blink_detected else "Not Blinking" + blink_clr = (0, 0, 255) if blink_detected else (0, 255, 0) + cv2.putText( + canvas, + blink_txt, + (50, 100), + cv2.FONT_HERSHEY_SIMPLEX, + 1.2, + blink_clr, + 2, + cv2.LINE_AA, ) - small = cv2.resize(frame, (cam_width, cam_height)) - thumb = cv2.copyMakeBorder( - small, - BORDER, - BORDER, - BORDER, - BORDER, - cv2.BORDER_CONSTANT, - value=(255, 255, 255), - ) - h, w = thumb.shape[:2] - canvas[-h - MARGIN : -MARGIN, -w - MARGIN : -MARGIN] = thumb - - now = time.time() - fps = 1 / (now - prev_time) - prev_time = now - - cv2.putText( - canvas, - f"FPS: {int(fps)}", - (50, 50), - cv2.FONT_HERSHEY_SIMPLEX, - 1.2, - (255, 255, 255), - 2, - cv2.LINE_AA, - ) - blink_txt = "Blinking" if blink_detected else "Not Blinking" - blink_clr = (0, 0, 255) if blink_detected else (0, 255, 0) - cv2.putText( - canvas, - blink_txt, - (50, 100), - cv2.FONT_HERSHEY_SIMPLEX, - 1.2, - blink_clr, - 2, - cv2.LINE_AA, - ) - - cv2.imshow("Gaze Estimation", canvas) - if cv2.waitKey(1) == 27: - break - - cap.release() - cv2.destroyAllWindows() + cv2.imshow("Gaze Estimation", canvas) + if cv2.waitKey(1) == 27: + break if __name__ == "__main__": diff --git a/src/eyetrax/app/virtualcam.py b/src/eyetrax/app/virtualcam.py index 1254abd..15dbaaf 100644 --- a/src/eyetrax/app/virtualcam.py +++ b/src/eyetrax/app/virtualcam.py @@ -1,9 +1,9 @@ -import argparse -import time import cv2 import numpy as np import pyvirtualcam + from eyetrax.utils.screen import get_screen_size +from eyetrax.utils.video import camera, iter_frames from eyetrax.gaze import GazeEstimator from eyetrax.calibration import ( run_9_point_calibration, @@ -51,53 +51,38 @@ def run_virtualcam(): kalman = None smoother = NoSmoother() - cap = cv2.VideoCapture(camera_index) - if not cap.isOpened(): - print("Error: cannot open camera.") - return - - cam_fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30 - green_bg = np.zeros((screen_height, screen_width, 3), dtype=np.uint8) green_bg[:] = (0, 255, 0) - with pyvirtualcam.Camera( - width=screen_width, - height=screen_height, - fps=cam_fps, - fmt=pyvirtualcam.PixelFormat.BGR, - ) as cam: - print(f"Virtual camera started: {cam.device}") + with camera(camera_index) as cap: + cam_fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30 + with pyvirtualcam.Camera( + width=screen_width, + height=screen_height, + fps=cam_fps, + fmt=pyvirtualcam.PixelFormat.BGR, + ) as cam: + print(f"Virtual camera started: {cam.device}") + for frame in iter_frames(cap): + features, blink_detected = gaze_estimator.extract_features(frame) - while True: - ret, frame = cap.read() - if not ret: - continue + if features is not None and not blink_detected: + gaze_point = gaze_estimator.predict(np.array([features]))[0] + x, y = map(int, gaze_point) + x_pred, y_pred = smoother.step(x, y) + contours = smoother.debug.get("contours", []) + else: + x_pred = y_pred = None + contours = [] - features, blink_detected = gaze_estimator.extract_features(frame) + output = green_bg.copy() + if contours: + cv2.drawContours(output, contours, -1, (0, 0, 255), 3) + if x_pred is not None and y_pred is not None: + cv2.circle(output, (x_pred, y_pred), 10, (0, 0, 255), -1) - if features is not None and not blink_detected: - gaze_point = gaze_estimator.predict(np.array([features]))[0] - x, y = map(int, gaze_point) - x_pred, y_pred = smoother.step(x, y) - contours = smoother.debug.get("contours", []) - else: - x_pred = y_pred = None - contours = [] - - output = green_bg.copy() - - if contours: - cv2.drawContours(output, contours, -1, (0, 0, 255), 3) - - if x_pred is not None and y_pred is not None: - cv2.circle(output, (x_pred, y_pred), 10, (0, 0, 255), -1) - - cam.send(output) - cam.sleep_until_next_frame() - - cap.release() - cv2.destroyAllWindows() + cam.send(output) + cam.sleep_until_next_frame() if __name__ == "__main__": diff --git a/src/eyetrax/utils/video.py b/src/eyetrax/utils/video.py new file mode 100644 index 0000000..e3dc211 --- /dev/null +++ b/src/eyetrax/utils/video.py @@ -0,0 +1,35 @@ +from __future__ import annotations +from contextlib import contextmanager +import cv2 + + +@contextmanager +def fullscreen(name: str): + """Open a window in full-screen mode""" + cv2.namedWindow(name, cv2.WND_PROP_FULLSCREEN) + cv2.setWindowProperty(name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) + try: + yield + finally: + cv2.destroyWindow(name) + + +@contextmanager +def camera(index: int = 0): + """Context manager returning an opened VideoCapture""" + cap = cv2.VideoCapture(index) + if not cap.isOpened(): + raise RuntimeError(f"cannot open camera {index}") + try: + yield cap + finally: + cap.release() + + +def iter_frames(cap: cv2.VideoCapture): + """Infinite generator yielding successive frames""" + while True: + ok, frame = cap.read() + if not ok: + continue + yield frame