From f449005751d5ac699a1b050e9f5bb45398bbca47 Mon Sep 17 00:00:00 2001 From: Dillon DuPont Date: Thu, 1 May 2025 12:07:43 -0700 Subject: [PATCH] code cleanup --- examples/video_maker_traj.py | 123 ++++++++++------------------------- 1 file changed, 34 insertions(+), 89 deletions(-) diff --git a/examples/video_maker_traj.py b/examples/video_maker_traj.py index b9966aa0..552969b6 100644 --- a/examples/video_maker_traj.py +++ b/examples/video_maker_traj.py @@ -69,7 +69,8 @@ last_known_cursor_position = None last_known_thought = None def parse_agent_response(filename_or_turn_dir): - """Parse agent response JSON file to extract text, actions, and cursor positions.""" + """Parse agent response JSON file to extract text, actions, cursor positions, thought, and action type.""" + global last_known_cursor_position, last_known_thought # Check if we're getting a filename or turn directory if os.path.isdir(filename_or_turn_dir): @@ -83,7 +84,9 @@ def parse_agent_response(filename_or_turn_dir): result = { "text": [], "actions": [], - "cursor_positions": [] + "cursor_positions": [], + "thought": None, + "action_type": "normal" } for agent_file in agent_response_files: @@ -125,92 +128,32 @@ def parse_agent_response(filename_or_turn_dir): result["actions"].append(action) # Extract cursor position if available if action.get("x") is not None and action.get("y") is not None: - result["cursor_positions"].append((action.get("x"), action.get("y"))) + position = (action.get("x"), action.get("y")) + result["cursor_positions"].append(position) + last_known_cursor_position = position + + # Determine action type + action_type = action.get("type", "") + if action_type == "click": + result["action_type"] = "clicking" + elif action_type == "type" or action_type == "input": + result["action_type"] = "typing" except Exception as e: print(f"Error processing {agent_file}: {e}") + # Set thought from text if available + if result["text"]: + result["thought"] = ' '.join(result["text"]) + last_known_thought = result["thought"] + else: + result["thought"] = last_known_thought + + # Set cursor position if not found + if not result["cursor_positions"]: + result["cursor_positions"] = [last_known_cursor_position] if last_known_cursor_position else [] + return result -def extract_thought_from_agent_response(filename_or_turn_dir): - """Extract thought from agent response for the current frame.""" - global last_known_thought - - agent_response = parse_agent_response(filename_or_turn_dir) - - if agent_response["text"]: - # Use the first text entry as the thought - last_known_thought = agent_response["text"][0] - return last_known_thought - - # Return the last known thought if no new thought is found - return last_known_thought - -def extract_cursor_position_from_agent_response(filename_or_turn_dir): - """Extract cursor position from agent response.""" - global last_known_cursor_position - - # Check if we're getting a filename or turn directory - if os.path.isdir(filename_or_turn_dir): - turn_dir = filename_or_turn_dir - else: - turn_dir = os.path.dirname(filename_or_turn_dir) - - # Find agent response files in the turn directory - agent_response_files = [f for f in os.listdir(turn_dir) if f.endswith('_agent_response.json')] - - for agent_file in agent_response_files: - try: - with open(os.path.join(turn_dir, agent_file), 'r') as f: - data = json.load(f) - response_data = data.get('response', {}) - - # Process outputs array if present - outputs = response_data.get("output", []) - for output in outputs: - if output.get("type") == "computer_call": - action = output.get("action", {}) - if action.get("x") is not None and action.get("y") is not None: - position = (action.get("x"), action.get("y")) - last_known_cursor_position = position - return position - except Exception as e: - print(f"Error processing {agent_file}: {e}") - - # No position found in agent response, return the last known position - return last_known_cursor_position - -def extract_action_from_agent_response(filename_or_turn_dir): - """Determine the action type from agent response.""" - # Check if we're getting a filename or turn directory - if os.path.isdir(filename_or_turn_dir): - turn_dir = filename_or_turn_dir - else: - turn_dir = os.path.dirname(filename_or_turn_dir) - - # Find agent response files in the turn directory - agent_response_files = [f for f in os.listdir(turn_dir) if f.endswith('_agent_response.json')] - - for agent_file in agent_response_files: - try: - with open(os.path.join(turn_dir, agent_file), 'r') as f: - data = json.load(f) - response_data = data.get('response', {}) - - # Process outputs array if present - outputs = response_data.get("output", []) - for output in outputs: - if output.get("type") == "computer_call": - action = output.get("action", {}) - action_type = action.get("type", "") - if action_type == "click": - return "clicking" - elif action_type == "type" or action_type == "input": - return "typing" - except Exception as e: - print(f"Error processing {agent_file}: {e}") - - return "normal" - def create_animated_vignette(image, frame_index): """ Create an animated purple/blue gradient vignette effect around the border of the image. @@ -588,12 +531,13 @@ def process_trajectory(trajectory_dir, output_dir, cursors): print(f"Error loading image {screenshot_path}: {e}") continue - # Extract action and position from agent response - action_type = extract_action_from_agent_response(turn_path) - current_cursor_pos = extract_cursor_position_from_agent_response(turn_path) + # Parse agent response + agent_response = parse_agent_response(turn_path) - # Extract thought from agent response - current_thought = extract_thought_from_agent_response(turn_path) + # Extract action type, cursor position, and thought + action_type = agent_response["action_type"] + current_cursor_pos = agent_response["cursor_positions"][0] if agent_response["cursor_positions"] else None + current_thought = agent_response["thought"] # Check if the current frame has an action (click/typing) is_action_frame = action_type in ["clicking", "typing"] @@ -667,7 +611,8 @@ def process_trajectory(trajectory_dir, output_dir, cursors): next_turn_path, next_agent_response_path, next_screenshot_path = turns[current_turn_index + 1] if next_screenshot_path: # Only if next turn has a screenshot # Get next position - next_cursor_pos = extract_cursor_position_from_agent_response(next_turn_path) + next_agent_response = parse_agent_response(next_turn_path) + next_cursor_pos = next_agent_response["cursor_positions"][0] if next_agent_response["cursor_positions"] else None # Only interpolate if both positions are valid and different if current_cursor_pos is not None and next_cursor_pos is not None and current_cursor_pos != next_cursor_pos: