Promote smoothing filters to package

This commit is contained in:
ck-zhang
2025-04-30 01:15:06 +08:00
parent 4ddc06fd3b
commit 31d25d5895
7 changed files with 182 additions and 161 deletions

View File

@@ -3,7 +3,6 @@ import cv2
import numpy as np
import argparse
import os
from scipy.stats import gaussian_kde
from eyetrax.utils.screen import get_screen_size
from eyetrax.gaze import GazeEstimator
@@ -13,7 +12,12 @@ from eyetrax.calibration import (
run_lissajous_calibration,
fine_tune_kalman_filter,
)
from eyetrax.filters import make_kalman
from eyetrax.filters import (
make_kalman,
KalmanSmoother,
KDESmoother,
NoSmoother,
)
def run_demo():
@@ -45,13 +49,19 @@ def run_demo():
else:
run_lissajous_calibration(gaze_estimator, camera_index=camera_index)
screen_width, screen_height = get_screen_size()
if filter_method == "kalman":
kalman = make_kalman()
fine_tune_kalman_filter(gaze_estimator, kalman, camera_index=camera_index)
smoother = KalmanSmoother(kalman)
elif filter_method == "kde":
kalman = None
smoother = KDESmoother(screen_width, screen_height, confidence=confidence_level)
else:
kalman = None
smoother = NoSmoother()
screen_width, screen_height = get_screen_size()
cam_width, cam_height = 320, 240
BORDER = 2
MARGIN = 20
@@ -71,10 +81,6 @@ def run_demo():
cap = cv2.VideoCapture(camera_index)
prev_time = time.time()
if filter_method == "kde":
gaze_history = []
time_window = 0.5
cursor_alpha = 0.0
cursor_step = 0.05
@@ -88,52 +94,8 @@ def run_demo():
gaze_point = gaze_estimator.predict(np.array([features]))[0]
x, y = map(int, gaze_point)
if kalman:
prediction = kalman.predict()
x_pred, y_pred = map(int, prediction[:2, 0])
x_pred = max(0, min(x_pred, screen_width - 1))
y_pred = max(0, min(y_pred, screen_height - 1))
measurement = np.array([[np.float32(x)], [np.float32(y)]])
if not np.any(kalman.statePre):
kalman.statePre[:2] = measurement
kalman.statePost[:2] = measurement
kalman.correct(measurement)
elif filter_method == "kde":
now = time.time()
gaze_history.append((now, x, y))
gaze_history = [
(t, gx, gy)
for (t, gx, gy) in gaze_history
if now - t <= time_window
]
if len(gaze_history) > 1:
arr = np.array([(gx, gy) for (_, gx, gy) in gaze_history])
try:
kde = gaussian_kde(arr.T)
xi, yi = np.mgrid[0:screen_width:320j, 0:screen_height:200j]
zi = (
kde(np.vstack([xi.ravel(), yi.ravel()])).reshape(xi.shape).T
)
flat = zi.ravel()
idx = np.argsort(flat)[::-1]
cdf = np.cumsum(flat[idx]) / flat.sum()
threshold = flat[idx[np.searchsorted(cdf, confidence_level)]]
mask = (zi >= threshold).astype(np.uint8)
mask = cv2.resize(mask, (screen_width, screen_height))
contours, _ = cv2.findContours(
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
x_pred = int(np.mean(arr[:, 0]))
y_pred = int(np.mean(arr[:, 1]))
except np.linalg.LinAlgError:
x_pred, y_pred = x, y
contours = []
else:
x_pred, y_pred = x, y
contours = []
else:
x_pred, y_pred = x, y
contours = []
x_pred, y_pred = smoother.step(x, y)
contours = smoother.debug.get("contours", [])
cursor_alpha = min(cursor_alpha + cursor_step, 1.0)
else:

View File

@@ -3,7 +3,6 @@ import time
import cv2
import numpy as np
import pyvirtualcam
from scipy.stats import gaussian_kde
from eyetrax.utils.screen import get_screen_size
from eyetrax.gaze import GazeEstimator
@@ -13,7 +12,12 @@ from eyetrax.calibration import (
run_lissajous_calibration,
fine_tune_kalman_filter,
)
from eyetrax.filters import make_kalman
from eyetrax.filters import (
make_kalman,
KalmanSmoother,
KDESmoother,
NoSmoother,
)
def run_virtualcam():
@@ -39,12 +43,18 @@ def run_virtualcam():
else:
run_lissajous_calibration(gaze_estimator, camera_index=camera_index)
kalman = None
screen_width, screen_height = get_screen_size()
if filter_method == "kalman":
kalman = make_kalman()
fine_tune_kalman_filter(gaze_estimator, kalman, camera_index=camera_index)
screen_width, screen_height = get_screen_size()
smoother = KalmanSmoother(kalman)
elif filter_method == "kde":
kalman = None
smoother = KDESmoother(screen_width, screen_height, confidence=confidence_level)
else:
kalman = None
smoother = NoSmoother()
cap = cv2.VideoCapture(camera_index)
if not cap.isOpened():
@@ -56,14 +66,6 @@ def run_virtualcam():
green_bg = np.zeros((screen_height, screen_width, 3), dtype=np.uint8)
green_bg[:] = (0, 255, 0)
gaze_history = []
time_window = 0.5
mask_prev = mask_next = None
blend_alpha = 1.0
contours_cache = []
last_kde_x_pred = last_kde_y_pred = None
frame_count = 0
with pyvirtualcam.Camera(
width=screen_width,
height=screen_height,
@@ -78,106 +80,26 @@ def run_virtualcam():
continue
features, blink_detected = gaze_estimator.extract_features(frame)
x_pred = y_pred = None
if features is not None and not blink_detected:
gaze_point = gaze_estimator.predict(np.array([features]))[0]
x, y = map(int, gaze_point)
if kalman and filter_method == "kalman":
prediction = kalman.predict()
x_pred, y_pred = map(int, prediction[:2, 0])
x_pred = max(0, min(x_pred, screen_width - 1))
y_pred = max(0, min(y_pred, screen_height - 1))
measurement = np.array([[np.float32(x)], [np.float32(y)]])
if not np.any(kalman.statePre):
kalman.statePre[:2] = measurement
kalman.statePost[:2] = measurement
kalman.correct(measurement)
elif filter_method == "kde":
now = time.time()
gaze_history.append((now, x, y))
gaze_history = [
(t, gx, gy)
for (t, gx, gy) in gaze_history
if now - t <= time_window
]
if len(gaze_history) > 1 and frame_count % 5 == 0:
arr = np.array([(gx, gy) for (_, gx, gy) in gaze_history])
try:
kde = gaussian_kde(arr.T)
xi, yi = np.mgrid[0:screen_width:200j, 0:screen_height:120j]
zi = (
kde(np.vstack([xi.ravel(), yi.ravel()]))
.reshape(xi.shape)
.T
)
flat = zi.ravel()
idx = np.argsort(flat)[::-1]
cdf = np.cumsum(flat[idx]) / flat.sum()
threshold = flat[
idx[np.searchsorted(cdf, confidence_level)]
]
mask_new = (zi >= threshold).astype(np.uint8)
mask_new = cv2.resize(
mask_new, (screen_width, screen_height)
)
kernel = np.ones((5, 5), np.uint8)
mask_new = cv2.morphologyEx(
mask_new, cv2.MORPH_OPEN, kernel
)
mask_new = cv2.morphologyEx(
mask_new, cv2.MORPH_CLOSE, kernel
)
mask_prev = mask_next if mask_next is not None else mask_new
mask_next = mask_new
last_kde_x_pred = int(np.mean(arr[:, 0]))
last_kde_y_pred = int(np.mean(arr[:, 1]))
blend_alpha = 0.0
except np.linalg.LinAlgError:
last_kde_x_pred = int(np.mean(arr[:, 0]))
last_kde_y_pred = int(np.mean(arr[:, 1]))
x_pred = last_kde_x_pred
y_pred = last_kde_y_pred
else:
x_pred, y_pred = x, y
x_pred, y_pred = smoother.step(x, y)
contours = smoother.debug.get("contours", [])
else:
x_pred = y_pred = None
contours = []
output = green_bg.copy()
if (
filter_method == "kde"
and mask_prev is not None
and mask_next is not None
):
blend_alpha = min(blend_alpha + 0.2, 1.0)
blended = cv2.addWeighted(
mask_prev.astype(np.float32),
1.0 - blend_alpha,
mask_next.astype(np.float32),
blend_alpha,
0,
).astype(np.uint8)
kernel2 = np.ones((5, 5), np.uint8)
blended = cv2.morphologyEx(blended, cv2.MORPH_OPEN, kernel2)
blended = cv2.morphologyEx(blended, cv2.MORPH_CLOSE, kernel2)
contours, _ = cv2.findContours(
blended, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_KCOS
)
contours_cache = contours
if x_pred is not None and y_pred is not None:
cv2.circle(output, (x_pred, y_pred), 8, (0, 0, 255), -1)
if contours:
cv2.drawContours(output, contours, -1, (0, 0, 255), 3)
if filter_method == "kde" and contours_cache:
cv2.drawContours(output, contours_cache, -1, (0, 0, 255), 3)
if filter_method != "kde" and x_pred is not None and y_pred is not None:
if x_pred is not None and y_pred is not None:
cv2.circle(output, (x_pred, y_pred), 10, (0, 0, 255), -1)
cam.send(output)
cam.sleep_until_next_frame()
frame_count += 1
cap.release()
cv2.destroyAllWindows()

View File

@@ -1,5 +1,4 @@
from __future__ import annotations
import cv2
import numpy as np
@@ -16,14 +15,11 @@ def make_kalman(
Factory returning a cv2.KalmanFilter
"""
kf = cv2.KalmanFilter(state_dim, meas_dim)
kf.transitionMatrix = np.array(
[[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]],
np.float32,
)
kf.measurementMatrix = np.array(
[[1, 0, 0, 0], [0, 1, 0, 0]],
np.float32,
[[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]], dtype=np.float32
)
kf.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], dtype=np.float32)
kf.processNoiseCov = np.eye(state_dim, dtype=np.float32) * process_var
kf.measurementNoiseCov = np.eye(meas_dim, dtype=np.float32) * measurement_var
kf.errorCovPost = np.eye(state_dim, dtype=np.float32)
@@ -37,3 +33,17 @@ def make_kalman(
kf.statePost[:] = init_state
return kf
from .base import BaseSmoother
from .kalman import KalmanSmoother
from .kde import KDESmoother
from .noop import NoSmoother
__all__ = [
"make_kalman",
"BaseSmoother",
"KalmanSmoother",
"KDESmoother",
"NoSmoother",
]

View File

@@ -0,0 +1,13 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Tuple
class BaseSmoother(ABC):
def __init__(self) -> None:
self.debug: dict = {}
@abstractmethod
def step(self, x: int, y: int) -> Tuple[int, int]: ...

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
from typing import Tuple
import numpy as np
from .base import BaseSmoother
from . import make_kalman
class KalmanSmoother(BaseSmoother):
def __init__(self, kf=None) -> None:
super().__init__()
try:
import cv2
self.kf = kf if isinstance(kf, cv2.KalmanFilter) else make_kalman()
except ImportError:
self.kf = make_kalman()
def step(self, x: int, y: int) -> Tuple[int, int]:
meas = np.array([[float(x)], [float(y)]], dtype=np.float32)
if not np.any(self.kf.statePost):
self.kf.statePre[:2] = meas
self.kf.statePost[:2] = meas
pred = self.kf.predict()
self.kf.correct(meas)
return int(pred[0, 0]), int(pred[1, 0])

View File

@@ -0,0 +1,72 @@
from __future__ import annotations
import time
from collections import deque
from typing import Deque, Tuple
import cv2
import numpy as np
from scipy.stats import gaussian_kde
from .base import BaseSmoother
class KDESmoother(BaseSmoother):
def __init__(
self,
screen_w: int,
screen_h: int,
*,
time_window: float = 0.5,
confidence: float = 0.5,
grid: Tuple[int, int] = (320, 200),
) -> None:
super().__init__()
self.sw, self.sh = screen_w, screen_h
self.window = time_window
self.conf = confidence
self.grid = grid
self.hist: Deque[Tuple[float, int, int]] = deque()
def step(self, x: int, y: int) -> Tuple[int, int]:
now = time.time()
self.hist.append((now, x, y))
while self.hist and now - self.hist[0][0] > self.window:
self.hist.popleft()
pts = np.asarray([(hx, hy) for (_, hx, hy) in self.hist])
if pts.shape[0] < 2:
self.debug.clear()
return x, y
try:
kde = gaussian_kde(pts.T)
xi, yi = np.mgrid[
0 : self.sw : complex(self.grid[0]),
0 : self.sh : complex(self.grid[1]),
]
zi = kde(np.vstack([xi.ravel(), yi.ravel()])).reshape(xi.shape).T
flat = zi.ravel()
idx = np.argsort(flat)[::-1]
cdf = np.cumsum(flat[idx]) / flat.sum()
thr = flat[idx[np.searchsorted(cdf, self.conf)]]
mask = (zi >= thr).astype(np.uint8)
mask = cv2.resize(mask, (self.sw, self.sh))
contours, _ = cv2.findContours(
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
self.debug["mask"] = mask
self.debug["contours"] = contours
sx, sy = pts.mean(axis=0).astype(int)
return int(sx), int(sy)
except np.linalg.LinAlgError:
self.debug.clear()
return x, y

View File

@@ -0,0 +1,11 @@
from __future__ import annotations
from typing import Tuple
from .base import BaseSmoother
class NoSmoother(BaseSmoother):
def step(self, x: int, y: int) -> Tuple[int, int]:
return x, y