From 31d25d5895558a9f068f1aef02404ee376632cd1 Mon Sep 17 00:00:00 2001 From: ck-zhang Date: Wed, 30 Apr 2025 01:15:06 +0800 Subject: [PATCH] Promote smoothing filters to package --- src/eyetrax/app/demo.py | 68 +++------- src/eyetrax/app/virtualcam.py | 124 ++++-------------- .../{filters.py => filters/__init__.py} | 24 +++- src/eyetrax/filters/base.py | 13 ++ src/eyetrax/filters/kalman.py | 31 +++++ src/eyetrax/filters/kde.py | 72 ++++++++++ src/eyetrax/filters/noop.py | 11 ++ 7 files changed, 182 insertions(+), 161 deletions(-) rename src/eyetrax/{filters.py => filters/__init__.py} (74%) create mode 100644 src/eyetrax/filters/base.py create mode 100644 src/eyetrax/filters/kalman.py create mode 100644 src/eyetrax/filters/kde.py create mode 100644 src/eyetrax/filters/noop.py diff --git a/src/eyetrax/app/demo.py b/src/eyetrax/app/demo.py index ff0a705..09d679e 100644 --- a/src/eyetrax/app/demo.py +++ b/src/eyetrax/app/demo.py @@ -3,7 +3,6 @@ import cv2 import numpy as np import argparse import os -from scipy.stats import gaussian_kde from eyetrax.utils.screen import get_screen_size from eyetrax.gaze import GazeEstimator @@ -13,7 +12,12 @@ from eyetrax.calibration import ( run_lissajous_calibration, fine_tune_kalman_filter, ) -from eyetrax.filters import make_kalman +from eyetrax.filters import ( + make_kalman, + KalmanSmoother, + KDESmoother, + NoSmoother, +) def run_demo(): @@ -45,13 +49,19 @@ def run_demo(): else: run_lissajous_calibration(gaze_estimator, camera_index=camera_index) + screen_width, screen_height = get_screen_size() + if filter_method == "kalman": kalman = make_kalman() fine_tune_kalman_filter(gaze_estimator, kalman, camera_index=camera_index) + smoother = KalmanSmoother(kalman) + elif filter_method == "kde": + kalman = None + smoother = KDESmoother(screen_width, screen_height, confidence=confidence_level) else: kalman = None + smoother = NoSmoother() - screen_width, screen_height = get_screen_size() cam_width, cam_height = 320, 240 BORDER = 2 MARGIN = 20 @@ -71,10 +81,6 @@ def run_demo(): cap = cv2.VideoCapture(camera_index) prev_time = time.time() - if filter_method == "kde": - gaze_history = [] - time_window = 0.5 - cursor_alpha = 0.0 cursor_step = 0.05 @@ -88,52 +94,8 @@ def run_demo(): gaze_point = gaze_estimator.predict(np.array([features]))[0] x, y = map(int, gaze_point) - if kalman: - prediction = kalman.predict() - x_pred, y_pred = map(int, prediction[:2, 0]) - x_pred = max(0, min(x_pred, screen_width - 1)) - y_pred = max(0, min(y_pred, screen_height - 1)) - measurement = np.array([[np.float32(x)], [np.float32(y)]]) - if not np.any(kalman.statePre): - kalman.statePre[:2] = measurement - kalman.statePost[:2] = measurement - kalman.correct(measurement) - elif filter_method == "kde": - now = time.time() - gaze_history.append((now, x, y)) - gaze_history = [ - (t, gx, gy) - for (t, gx, gy) in gaze_history - if now - t <= time_window - ] - if len(gaze_history) > 1: - arr = np.array([(gx, gy) for (_, gx, gy) in gaze_history]) - try: - kde = gaussian_kde(arr.T) - xi, yi = np.mgrid[0:screen_width:320j, 0:screen_height:200j] - zi = ( - kde(np.vstack([xi.ravel(), yi.ravel()])).reshape(xi.shape).T - ) - flat = zi.ravel() - idx = np.argsort(flat)[::-1] - cdf = np.cumsum(flat[idx]) / flat.sum() - threshold = flat[idx[np.searchsorted(cdf, confidence_level)]] - mask = (zi >= threshold).astype(np.uint8) - mask = cv2.resize(mask, (screen_width, screen_height)) - contours, _ = cv2.findContours( - mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE - ) - x_pred = int(np.mean(arr[:, 0])) - y_pred = int(np.mean(arr[:, 1])) - except np.linalg.LinAlgError: - x_pred, y_pred = x, y - contours = [] - else: - x_pred, y_pred = x, y - contours = [] - else: - x_pred, y_pred = x, y - contours = [] + x_pred, y_pred = smoother.step(x, y) + contours = smoother.debug.get("contours", []) cursor_alpha = min(cursor_alpha + cursor_step, 1.0) else: diff --git a/src/eyetrax/app/virtualcam.py b/src/eyetrax/app/virtualcam.py index b33f985..63e7aed 100644 --- a/src/eyetrax/app/virtualcam.py +++ b/src/eyetrax/app/virtualcam.py @@ -3,7 +3,6 @@ import time import cv2 import numpy as np import pyvirtualcam -from scipy.stats import gaussian_kde from eyetrax.utils.screen import get_screen_size from eyetrax.gaze import GazeEstimator @@ -13,7 +12,12 @@ from eyetrax.calibration import ( run_lissajous_calibration, fine_tune_kalman_filter, ) -from eyetrax.filters import make_kalman +from eyetrax.filters import ( + make_kalman, + KalmanSmoother, + KDESmoother, + NoSmoother, +) def run_virtualcam(): @@ -39,12 +43,18 @@ def run_virtualcam(): else: run_lissajous_calibration(gaze_estimator, camera_index=camera_index) - kalman = None + screen_width, screen_height = get_screen_size() + if filter_method == "kalman": kalman = make_kalman() fine_tune_kalman_filter(gaze_estimator, kalman, camera_index=camera_index) - - screen_width, screen_height = get_screen_size() + smoother = KalmanSmoother(kalman) + elif filter_method == "kde": + kalman = None + smoother = KDESmoother(screen_width, screen_height, confidence=confidence_level) + else: + kalman = None + smoother = NoSmoother() cap = cv2.VideoCapture(camera_index) if not cap.isOpened(): @@ -56,14 +66,6 @@ def run_virtualcam(): green_bg = np.zeros((screen_height, screen_width, 3), dtype=np.uint8) green_bg[:] = (0, 255, 0) - gaze_history = [] - time_window = 0.5 - mask_prev = mask_next = None - blend_alpha = 1.0 - contours_cache = [] - last_kde_x_pred = last_kde_y_pred = None - frame_count = 0 - with pyvirtualcam.Camera( width=screen_width, height=screen_height, @@ -78,106 +80,26 @@ def run_virtualcam(): continue features, blink_detected = gaze_estimator.extract_features(frame) - x_pred = y_pred = None if features is not None and not blink_detected: gaze_point = gaze_estimator.predict(np.array([features]))[0] x, y = map(int, gaze_point) - - if kalman and filter_method == "kalman": - prediction = kalman.predict() - x_pred, y_pred = map(int, prediction[:2, 0]) - x_pred = max(0, min(x_pred, screen_width - 1)) - y_pred = max(0, min(y_pred, screen_height - 1)) - measurement = np.array([[np.float32(x)], [np.float32(y)]]) - if not np.any(kalman.statePre): - kalman.statePre[:2] = measurement - kalman.statePost[:2] = measurement - kalman.correct(measurement) - - elif filter_method == "kde": - now = time.time() - gaze_history.append((now, x, y)) - gaze_history = [ - (t, gx, gy) - for (t, gx, gy) in gaze_history - if now - t <= time_window - ] - if len(gaze_history) > 1 and frame_count % 5 == 0: - arr = np.array([(gx, gy) for (_, gx, gy) in gaze_history]) - try: - kde = gaussian_kde(arr.T) - xi, yi = np.mgrid[0:screen_width:200j, 0:screen_height:120j] - zi = ( - kde(np.vstack([xi.ravel(), yi.ravel()])) - .reshape(xi.shape) - .T - ) - flat = zi.ravel() - idx = np.argsort(flat)[::-1] - cdf = np.cumsum(flat[idx]) / flat.sum() - threshold = flat[ - idx[np.searchsorted(cdf, confidence_level)] - ] - mask_new = (zi >= threshold).astype(np.uint8) - mask_new = cv2.resize( - mask_new, (screen_width, screen_height) - ) - kernel = np.ones((5, 5), np.uint8) - mask_new = cv2.morphologyEx( - mask_new, cv2.MORPH_OPEN, kernel - ) - mask_new = cv2.morphologyEx( - mask_new, cv2.MORPH_CLOSE, kernel - ) - mask_prev = mask_next if mask_next is not None else mask_new - mask_next = mask_new - last_kde_x_pred = int(np.mean(arr[:, 0])) - last_kde_y_pred = int(np.mean(arr[:, 1])) - blend_alpha = 0.0 - except np.linalg.LinAlgError: - last_kde_x_pred = int(np.mean(arr[:, 0])) - last_kde_y_pred = int(np.mean(arr[:, 1])) - x_pred = last_kde_x_pred - y_pred = last_kde_y_pred - - else: - x_pred, y_pred = x, y + x_pred, y_pred = smoother.step(x, y) + contours = smoother.debug.get("contours", []) + else: + x_pred = y_pred = None + contours = [] output = green_bg.copy() - if ( - filter_method == "kde" - and mask_prev is not None - and mask_next is not None - ): - blend_alpha = min(blend_alpha + 0.2, 1.0) - blended = cv2.addWeighted( - mask_prev.astype(np.float32), - 1.0 - blend_alpha, - mask_next.astype(np.float32), - blend_alpha, - 0, - ).astype(np.uint8) - kernel2 = np.ones((5, 5), np.uint8) - blended = cv2.morphologyEx(blended, cv2.MORPH_OPEN, kernel2) - blended = cv2.morphologyEx(blended, cv2.MORPH_CLOSE, kernel2) - contours, _ = cv2.findContours( - blended, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_KCOS - ) - contours_cache = contours - if x_pred is not None and y_pred is not None: - cv2.circle(output, (x_pred, y_pred), 8, (0, 0, 255), -1) + if contours: + cv2.drawContours(output, contours, -1, (0, 0, 255), 3) - if filter_method == "kde" and contours_cache: - cv2.drawContours(output, contours_cache, -1, (0, 0, 255), 3) - - if filter_method != "kde" and x_pred is not None and y_pred is not None: + if x_pred is not None and y_pred is not None: cv2.circle(output, (x_pred, y_pred), 10, (0, 0, 255), -1) cam.send(output) cam.sleep_until_next_frame() - frame_count += 1 cap.release() cv2.destroyAllWindows() diff --git a/src/eyetrax/filters.py b/src/eyetrax/filters/__init__.py similarity index 74% rename from src/eyetrax/filters.py rename to src/eyetrax/filters/__init__.py index f5ba9d6..207bae7 100644 --- a/src/eyetrax/filters.py +++ b/src/eyetrax/filters/__init__.py @@ -1,5 +1,4 @@ from __future__ import annotations - import cv2 import numpy as np @@ -16,14 +15,11 @@ def make_kalman( Factory returning a cv2.KalmanFilter """ kf = cv2.KalmanFilter(state_dim, meas_dim) + kf.transitionMatrix = np.array( - [[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]], - np.float32, - ) - kf.measurementMatrix = np.array( - [[1, 0, 0, 0], [0, 1, 0, 0]], - np.float32, + [[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]], dtype=np.float32 ) + kf.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], dtype=np.float32) kf.processNoiseCov = np.eye(state_dim, dtype=np.float32) * process_var kf.measurementNoiseCov = np.eye(meas_dim, dtype=np.float32) * measurement_var kf.errorCovPost = np.eye(state_dim, dtype=np.float32) @@ -37,3 +33,17 @@ def make_kalman( kf.statePost[:] = init_state return kf + + +from .base import BaseSmoother +from .kalman import KalmanSmoother +from .kde import KDESmoother +from .noop import NoSmoother + +__all__ = [ + "make_kalman", + "BaseSmoother", + "KalmanSmoother", + "KDESmoother", + "NoSmoother", +] diff --git a/src/eyetrax/filters/base.py b/src/eyetrax/filters/base.py new file mode 100644 index 0000000..90c66b9 --- /dev/null +++ b/src/eyetrax/filters/base.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Tuple + + +class BaseSmoother(ABC): + + def __init__(self) -> None: + self.debug: dict = {} + + @abstractmethod + def step(self, x: int, y: int) -> Tuple[int, int]: ... diff --git a/src/eyetrax/filters/kalman.py b/src/eyetrax/filters/kalman.py new file mode 100644 index 0000000..a88bc16 --- /dev/null +++ b/src/eyetrax/filters/kalman.py @@ -0,0 +1,31 @@ +from __future__ import annotations +from typing import Tuple +import numpy as np + +from .base import BaseSmoother +from . import make_kalman + + +class KalmanSmoother(BaseSmoother): + + def __init__(self, kf=None) -> None: + super().__init__() + + try: + import cv2 + + self.kf = kf if isinstance(kf, cv2.KalmanFilter) else make_kalman() + except ImportError: + self.kf = make_kalman() + + def step(self, x: int, y: int) -> Tuple[int, int]: + meas = np.array([[float(x)], [float(y)]], dtype=np.float32) + + if not np.any(self.kf.statePost): + self.kf.statePre[:2] = meas + self.kf.statePost[:2] = meas + + pred = self.kf.predict() + self.kf.correct(meas) + + return int(pred[0, 0]), int(pred[1, 0]) diff --git a/src/eyetrax/filters/kde.py b/src/eyetrax/filters/kde.py new file mode 100644 index 0000000..62a8887 --- /dev/null +++ b/src/eyetrax/filters/kde.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import time +from collections import deque +from typing import Deque, Tuple + +import cv2 +import numpy as np +from scipy.stats import gaussian_kde + +from .base import BaseSmoother + + +class KDESmoother(BaseSmoother): + + def __init__( + self, + screen_w: int, + screen_h: int, + *, + time_window: float = 0.5, + confidence: float = 0.5, + grid: Tuple[int, int] = (320, 200), + ) -> None: + super().__init__() + self.sw, self.sh = screen_w, screen_h + self.window = time_window + self.conf = confidence + self.grid = grid + self.hist: Deque[Tuple[float, int, int]] = deque() + + def step(self, x: int, y: int) -> Tuple[int, int]: + now = time.time() + + self.hist.append((now, x, y)) + while self.hist and now - self.hist[0][0] > self.window: + self.hist.popleft() + + pts = np.asarray([(hx, hy) for (_, hx, hy) in self.hist]) + if pts.shape[0] < 2: + self.debug.clear() + return x, y + + try: + kde = gaussian_kde(pts.T) + xi, yi = np.mgrid[ + 0 : self.sw : complex(self.grid[0]), + 0 : self.sh : complex(self.grid[1]), + ] + zi = kde(np.vstack([xi.ravel(), yi.ravel()])).reshape(xi.shape).T + + flat = zi.ravel() + idx = np.argsort(flat)[::-1] + cdf = np.cumsum(flat[idx]) / flat.sum() + thr = flat[idx[np.searchsorted(cdf, self.conf)]] + + mask = (zi >= thr).astype(np.uint8) + mask = cv2.resize(mask, (self.sw, self.sh)) + + contours, _ = cv2.findContours( + mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE + ) + + self.debug["mask"] = mask + self.debug["contours"] = contours + + sx, sy = pts.mean(axis=0).astype(int) + return int(sx), int(sy) + + except np.linalg.LinAlgError: + self.debug.clear() + return x, y diff --git a/src/eyetrax/filters/noop.py b/src/eyetrax/filters/noop.py new file mode 100644 index 0000000..85b5589 --- /dev/null +++ b/src/eyetrax/filters/noop.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from typing import Tuple + +from .base import BaseSmoother + + +class NoSmoother(BaseSmoother): + + def step(self, x: int, y: int) -> Tuple[int, int]: + return x, y