mirror of
https://github.com/trycua/computer.git
synced 2026-01-10 23:40:03 -06:00
code cleanup
This commit is contained in:
@@ -69,7 +69,8 @@ last_known_cursor_position = None
|
||||
last_known_thought = None
|
||||
|
||||
def parse_agent_response(filename_or_turn_dir):
|
||||
"""Parse agent response JSON file to extract text, actions, and cursor positions."""
|
||||
"""Parse agent response JSON file to extract text, actions, cursor positions, thought, and action type."""
|
||||
global last_known_cursor_position, last_known_thought
|
||||
|
||||
# Check if we're getting a filename or turn directory
|
||||
if os.path.isdir(filename_or_turn_dir):
|
||||
@@ -83,7 +84,9 @@ def parse_agent_response(filename_or_turn_dir):
|
||||
result = {
|
||||
"text": [],
|
||||
"actions": [],
|
||||
"cursor_positions": []
|
||||
"cursor_positions": [],
|
||||
"thought": None,
|
||||
"action_type": "normal"
|
||||
}
|
||||
|
||||
for agent_file in agent_response_files:
|
||||
@@ -125,92 +128,32 @@ def parse_agent_response(filename_or_turn_dir):
|
||||
result["actions"].append(action)
|
||||
# Extract cursor position if available
|
||||
if action.get("x") is not None and action.get("y") is not None:
|
||||
result["cursor_positions"].append((action.get("x"), action.get("y")))
|
||||
position = (action.get("x"), action.get("y"))
|
||||
result["cursor_positions"].append(position)
|
||||
last_known_cursor_position = position
|
||||
|
||||
# Determine action type
|
||||
action_type = action.get("type", "")
|
||||
if action_type == "click":
|
||||
result["action_type"] = "clicking"
|
||||
elif action_type == "type" or action_type == "input":
|
||||
result["action_type"] = "typing"
|
||||
except Exception as e:
|
||||
print(f"Error processing {agent_file}: {e}")
|
||||
|
||||
# Set thought from text if available
|
||||
if result["text"]:
|
||||
result["thought"] = ' '.join(result["text"])
|
||||
last_known_thought = result["thought"]
|
||||
else:
|
||||
result["thought"] = last_known_thought
|
||||
|
||||
# Set cursor position if not found
|
||||
if not result["cursor_positions"]:
|
||||
result["cursor_positions"] = [last_known_cursor_position] if last_known_cursor_position else []
|
||||
|
||||
return result
|
||||
|
||||
def extract_thought_from_agent_response(filename_or_turn_dir):
|
||||
"""Extract thought from agent response for the current frame."""
|
||||
global last_known_thought
|
||||
|
||||
agent_response = parse_agent_response(filename_or_turn_dir)
|
||||
|
||||
if agent_response["text"]:
|
||||
# Use the first text entry as the thought
|
||||
last_known_thought = agent_response["text"][0]
|
||||
return last_known_thought
|
||||
|
||||
# Return the last known thought if no new thought is found
|
||||
return last_known_thought
|
||||
|
||||
def extract_cursor_position_from_agent_response(filename_or_turn_dir):
|
||||
"""Extract cursor position from agent response."""
|
||||
global last_known_cursor_position
|
||||
|
||||
# Check if we're getting a filename or turn directory
|
||||
if os.path.isdir(filename_or_turn_dir):
|
||||
turn_dir = filename_or_turn_dir
|
||||
else:
|
||||
turn_dir = os.path.dirname(filename_or_turn_dir)
|
||||
|
||||
# Find agent response files in the turn directory
|
||||
agent_response_files = [f for f in os.listdir(turn_dir) if f.endswith('_agent_response.json')]
|
||||
|
||||
for agent_file in agent_response_files:
|
||||
try:
|
||||
with open(os.path.join(turn_dir, agent_file), 'r') as f:
|
||||
data = json.load(f)
|
||||
response_data = data.get('response', {})
|
||||
|
||||
# Process outputs array if present
|
||||
outputs = response_data.get("output", [])
|
||||
for output in outputs:
|
||||
if output.get("type") == "computer_call":
|
||||
action = output.get("action", {})
|
||||
if action.get("x") is not None and action.get("y") is not None:
|
||||
position = (action.get("x"), action.get("y"))
|
||||
last_known_cursor_position = position
|
||||
return position
|
||||
except Exception as e:
|
||||
print(f"Error processing {agent_file}: {e}")
|
||||
|
||||
# No position found in agent response, return the last known position
|
||||
return last_known_cursor_position
|
||||
|
||||
def extract_action_from_agent_response(filename_or_turn_dir):
|
||||
"""Determine the action type from agent response."""
|
||||
# Check if we're getting a filename or turn directory
|
||||
if os.path.isdir(filename_or_turn_dir):
|
||||
turn_dir = filename_or_turn_dir
|
||||
else:
|
||||
turn_dir = os.path.dirname(filename_or_turn_dir)
|
||||
|
||||
# Find agent response files in the turn directory
|
||||
agent_response_files = [f for f in os.listdir(turn_dir) if f.endswith('_agent_response.json')]
|
||||
|
||||
for agent_file in agent_response_files:
|
||||
try:
|
||||
with open(os.path.join(turn_dir, agent_file), 'r') as f:
|
||||
data = json.load(f)
|
||||
response_data = data.get('response', {})
|
||||
|
||||
# Process outputs array if present
|
||||
outputs = response_data.get("output", [])
|
||||
for output in outputs:
|
||||
if output.get("type") == "computer_call":
|
||||
action = output.get("action", {})
|
||||
action_type = action.get("type", "")
|
||||
if action_type == "click":
|
||||
return "clicking"
|
||||
elif action_type == "type" or action_type == "input":
|
||||
return "typing"
|
||||
except Exception as e:
|
||||
print(f"Error processing {agent_file}: {e}")
|
||||
|
||||
return "normal"
|
||||
|
||||
def create_animated_vignette(image, frame_index):
|
||||
"""
|
||||
Create an animated purple/blue gradient vignette effect around the border of the image.
|
||||
@@ -588,12 +531,13 @@ def process_trajectory(trajectory_dir, output_dir, cursors):
|
||||
print(f"Error loading image {screenshot_path}: {e}")
|
||||
continue
|
||||
|
||||
# Extract action and position from agent response
|
||||
action_type = extract_action_from_agent_response(turn_path)
|
||||
current_cursor_pos = extract_cursor_position_from_agent_response(turn_path)
|
||||
# Parse agent response
|
||||
agent_response = parse_agent_response(turn_path)
|
||||
|
||||
# Extract thought from agent response
|
||||
current_thought = extract_thought_from_agent_response(turn_path)
|
||||
# Extract action type, cursor position, and thought
|
||||
action_type = agent_response["action_type"]
|
||||
current_cursor_pos = agent_response["cursor_positions"][0] if agent_response["cursor_positions"] else None
|
||||
current_thought = agent_response["thought"]
|
||||
|
||||
# Check if the current frame has an action (click/typing)
|
||||
is_action_frame = action_type in ["clicking", "typing"]
|
||||
@@ -667,7 +611,8 @@ def process_trajectory(trajectory_dir, output_dir, cursors):
|
||||
next_turn_path, next_agent_response_path, next_screenshot_path = turns[current_turn_index + 1]
|
||||
if next_screenshot_path: # Only if next turn has a screenshot
|
||||
# Get next position
|
||||
next_cursor_pos = extract_cursor_position_from_agent_response(next_turn_path)
|
||||
next_agent_response = parse_agent_response(next_turn_path)
|
||||
next_cursor_pos = next_agent_response["cursor_positions"][0] if next_agent_response["cursor_positions"] else None
|
||||
|
||||
# Only interpolate if both positions are valid and different
|
||||
if current_cursor_pos is not None and next_cursor_pos is not None and current_cursor_pos != next_cursor_pos:
|
||||
|
||||
Reference in New Issue
Block a user