Spaces:
Sleeping
Sleeping
import asyncio, json, logging, time | |
from pathlib import Path | |
from typing import List, Dict, Any, Optional, Literal | |
from urllib.parse import urlparse, parse_qs | |
from playwright.sync_api import Page as SyncPage, TimeoutError as SyncPlaywrightTimeoutError, Locator as SyncLocator, ElementHandle as SyncElementHandle | |
logger = logging.getLogger(__name__) | |
# -------------------------------------------------- | |
# Exceptions | |
# -------------------------------------------------- | |
class Drift(Exception): | |
"""Raised when replay diverges from expected state.""" | |
def __init__(self, msg: str, event: Dict[str, Any] | None = None): | |
super().__init__(msg) | |
self.event = event | |
# -------------------------------------------------- | |
# Trace loader helper | |
# -------------------------------------------------- | |
def load_trace(path: str | Path) -> List[Dict[str, Any]]: | |
return [json.loads(l) for l in Path(path).read_text().splitlines() if l.strip()] | |
# -------------------------------------------------- | |
# Replayer | |
# -------------------------------------------------- | |
class TraceReplayerSync: | |
BTN_MAP: Dict[str, Literal["left", "middle", "right"]] = {"left": "left", "middle": "middle", "right": "right"} | |
MOD_MAP = {"alt": "Alt", "ctrl": "Control", "shift": "Shift", "meta": "Meta"} | |
def __init__(self, page: SyncPage, trace: List[Dict[str, Any]], controller: Any, | |
user_provided_files: Optional[List[str]] = None, | |
ui_q: Optional[asyncio.Queue] = None, | |
main_loop: Optional[asyncio.AbstractEventLoop] = None): | |
logger.debug(f"[REPLAYER_SYNC __init__] Initializing. Page: {type(page)}, Trace_events: {len(trace) if trace else 0}, Controller: {type(controller)}") | |
self.page = page | |
self.trace = trace | |
self.controller = controller | |
self.user_provided_files = user_provided_files or [] # Store user-provided file paths | |
self._clicked_with_selector = False | |
self._clicked_dispatch = False | |
self.ui_q = ui_q | |
self.main_loop = main_loop | |
# ------------- main loop ------------- | |
def play(self, speed: float = 2.0): | |
i = 0 | |
logger.debug(f"[REPLAYER play] Starting play loop. Trace length: {len(self.trace)}") | |
while i < len(self.trace): | |
ev = self.trace[i] | |
logger.debug(f"[REPLAYER play] Processing event {i+1}/{len(self.trace)}: Type: {ev.get('type')}, URL: {ev.get('url')}") | |
# Avoid processing 'v' key press before clipboard_paste event | |
# Check if the current event is 'v' key input and the next is 'clipboard_paste' | |
if ev.get("type") == "keyboard_input" and ev.get("key") == "v" and not ev.get("modifiers"): | |
if (i + 1) < len(self.trace): | |
next_ev = self.trace[i+1] | |
if next_ev.get("type") == "clipboard_paste": | |
logger.info(f"[REPLAYER play] Skipping 'v' key press before clipboard_paste. Event {i+1}") | |
i += 1 # Skip the 'v' key press event | |
ev = next_ev # Process the clipboard_paste event in this iteration | |
logger.debug(f"[REPLAYER play] Now processing event {i+1}/{len(self.trace)}: Type: {ev.get('type')}, URL: {ev.get('url')}") | |
# New concise and iconic log format | |
log_type = ev["type"] | |
current_event_url = ev.get("url", "N/A") # URL from the event itself | |
log_message_elements = [] | |
if log_type == "mouse_click": | |
log_message_elements.append("🖱️ MouseClick") | |
button_text = ev.get("text") | |
selector = ev.get("selector") | |
if button_text: | |
log_message_elements.append(f"button_text:\"{button_text}\"") | |
elif selector: | |
log_message_elements.append(f"selector:\"{selector}\"") | |
else: | |
log_message_elements.append(f"xy:({ev.get('x', 'N/A')},{ev.get('y', 'N/A')})") | |
button_type = ev.get("button", "left") | |
if button_type != "left": # Only show if not default left click | |
log_message_elements.append(f"button:\"{button_type}\"") | |
log_message_elements.append(f"url='{current_event_url}'") | |
elif log_type == "keyboard_input": | |
log_message_elements.append("⌨️ KeyInput") | |
key_val = ev.get("key") | |
log_message_elements.append(f"key:'{key_val}'") | |
modifiers = ev.get("modifiers") | |
if modifiers: | |
log_message_elements.append(f"mods:{modifiers}") | |
log_message_elements.append(f"url='{current_event_url}'") | |
elif log_type == "navigation": | |
log_message_elements.append("🌐 Navigation") | |
to_url = ev.get("to") | |
log_message_elements.append(f"to='{to_url}'") | |
else: # Generic fallback for other event types like scroll, viewport_change etc. | |
log_message_elements.append(f"{log_type.replace('_', ' ').title()}") | |
s = ev.get("selector") | |
if s: log_message_elements.append(f"selector:\"{s}\"") | |
if 'x' in ev and 'y' in ev: | |
log_message_elements.append(f"coords:({ev.get('x')},{ev.get('y')})") | |
log_message_elements.append(f"url='{current_event_url}'") | |
# Send iconic log message to UI queue | |
log_msg_str = ", ".join(log_message_elements) | |
if self.ui_q and self.main_loop: | |
self.main_loop.call_soon_threadsafe(self.ui_q.put_nowait, log_msg_str) | |
else: # Fallback to standard logger if queue/loop not provided (e.g. during testing) | |
logger.info(log_msg_str) | |
# Delay logic | |
logger.debug(f"[REPLAYER play] Event {i+1}: Applying delay of {ev.get('t', 0)}ms, speed adjusted: {ev.get('t', 0)/speed}ms") | |
event_delay_ms = ev.get("t", 0) | |
if event_delay_ms > 10: # Log only if delay is > 10ms (to avoid spamming for 0ms delays) | |
logger.debug(f"Pausing for {event_delay_ms/1000.0:.3f}s (speed adjusted: {event_delay_ms/1000.0/speed:.3f}s)") | |
time.sleep(event_delay_ms / 1000.0 / speed) | |
if ev["type"] == "keyboard_input": | |
consumed = self._batch_type(i) | |
i += consumed | |
logger.debug(f"[REPLAYER play] Event {i+1-consumed} (keyboard_input batch): Consumed {consumed} events. New index: {i}") | |
continue | |
self._apply(ev) | |
logger.debug(f"[REPLAYER play] Event {i+1}: _apply(ev) completed.") | |
self._verify(ev) | |
logger.debug(f"[REPLAYER play] Event {i+1}: _verify(ev) completed.") | |
i += 1 | |
logger.debug(f"[REPLAYER play] Play loop finished.") | |
# ------------- batching ------------- | |
def _batch_type(self, idx: int) -> int: | |
ev_start_batch = self.trace[idx] | |
sel, mods = ev_start_batch.get("selector"), ev_start_batch.get("modifiers", []) | |
text_to_type = "" | |
current_idx_in_trace = idx | |
first_key = ev_start_batch.get("key", "") | |
is_first_key_batchable = len(first_key) == 1 and not mods | |
if is_first_key_batchable: | |
text_to_type = first_key | |
current_idx_in_trace = idx + 1 | |
while current_idx_in_trace < len(self.trace): | |
nxt = self.trace[current_idx_in_trace] | |
if nxt["type"] != "keyboard_input" or nxt.get("t",1) != 0: break | |
if nxt.get("selector") != sel: break | |
if nxt.get("modifiers"): break | |
next_key_char = nxt.get("key", "") | |
if len(next_key_char) == 1: | |
text_to_type += next_key_char | |
current_idx_in_trace += 1 | |
else: | |
break | |
current_idx_in_trace -= 1 | |
num_events_processed = 0 | |
if len(text_to_type) > 1: | |
self._apply_type(sel, text_to_type, [], ev_start_batch) | |
self._verify(ev_start_batch) | |
num_events_processed = current_idx_in_trace - idx + 1 | |
else: | |
self._apply(ev_start_batch) | |
self._verify(ev_start_batch) | |
num_events_processed = 1 | |
return num_events_processed | |
def _apply_type(self, sel: Optional[str], text: str, mods: List[str], original_event_for_log: Dict[str, Any]): | |
log_sel_for_type = sel or "N/A" | |
logger.debug(f"APPLYING BATCH TYPE: '{text}' -> {log_sel_for_type}") | |
if sel: | |
try: | |
element_to_fill = self.page.locator(sel).first | |
element_to_fill.wait_for(state='visible', timeout=5000) | |
element_to_fill.focus(timeout=1000) | |
time.sleep(0.2) # Short delay after focus before filling | |
element_to_fill.fill(text) | |
except Exception as e_fill: | |
logger.error(f"Error during locator.fill('{text}') for selector '{sel}': {e_fill.__class__.__name__} - {str(e_fill)}. Falling back to keyboard.type.") | |
# Fallback to original keyboard.type if fill fails for some reason | |
mapped_mods = [self.MOD_MAP[m] for m in mods if m in self.MOD_MAP] | |
for m_down in mapped_mods: self.page.keyboard.down(m_down) | |
try: | |
self.page.keyboard.type(text) | |
except Exception as e_type: | |
logger.error(f"Error during fallback page.keyboard.type('{text}'): {e_type.__class__.__name__} - {str(e_type)}") | |
for m_up in reversed(mapped_mods): self.page.keyboard.up(m_up) | |
else: | |
# If no selector, fallback to general keyboard typing (less common for batched text) | |
logger.warning(f"Attempting to batch type '{text}' without a selector. Using page.keyboard.type().") | |
mapped_mods = [self.MOD_MAP[m] for m in mods if m in self.MOD_MAP] | |
for m_down in mapped_mods: self.page.keyboard.down(m_down) | |
try: | |
self.page.keyboard.type(text) | |
except Exception as e_type: | |
logger.error(f"Error during page.keyboard.type('{text}') without selector: {e_type.__class__.__name__} - {str(e_type)}") | |
for m_up in reversed(mapped_mods): self.page.keyboard.up(m_up) | |
logger.debug(f"✅ done BATCH TYPE: '{text}' -> {log_sel_for_type}") | |
# ------------- apply ------------- | |
def _apply(self, ev: Dict[str, Any]): | |
typ = ev["type"] | |
sel_event = ev.get("selector") | |
logger.debug(f"[REPLAYER _apply] Applying action: {typ}, selector: {sel_event}, keys: {ev.get('key')}, to: {ev.get('to')}") | |
logger.debug(f"APPLYING ACTION: {typ} for sel={sel_event or 'N/A'}, key={ev.get('key','N/A')}") | |
if typ == "navigation": | |
target = ev["to"] | |
if not self._url_eq(self.page.url, target): | |
logger.debug(f"[REPLAYER _apply NAV] Attempting self.page.goto('{target}')") | |
try: | |
# Restore original navigation target and timeout | |
self.page.goto(target, wait_until="domcontentloaded", timeout=15000) | |
logger.debug(f"[REPLAYER _apply NAV] self.page.goto to '{target}' SUCCEEDED.") | |
except SyncPlaywrightTimeoutError as pte_goto: | |
logger.error(f"[REPLAYER _apply NAV] PlaywrightTimeoutError during goto '{target}': {pte_goto}", exc_info=True) | |
except Exception as e_goto_general: | |
logger.error(f"[REPLAYER _apply NAV] Exception during goto '{target}': {e_goto_general}", exc_info=True) | |
else: | |
logger.debug(f"[REPLAYER _apply NAV] Page URL {self.page.url} already matches target {target}. Skipping goto.") | |
logger.debug(f"[REPLAYER _apply NAV] Attempting page.bring_to_front() for {target}") | |
self.page.bring_to_front() | |
logger.debug(f"[REPLAYER _apply NAV] page.bring_to_front() completed for {target}") | |
# Enhanced wait after navigation | |
try: | |
logger.debug(f"Waiting for 'load' state after navigating to {target}") | |
logger.debug(f"[REPLAYER _apply NAV] Attempting page.wait_for_load_state('load') for {target}") | |
self.page.wait_for_load_state('load', timeout=10000) # Wait for basic load | |
logger.debug(f"[REPLAYER _apply NAV] page.wait_for_load_state('load') completed for {target}") | |
logger.debug(f"'load' state confirmed for {target}. Now waiting for networkidle.") | |
logger.debug(f"[REPLAYER _apply NAV] Attempting page.wait_for_load_state('networkidle') for {target}") | |
self.page.wait_for_load_state('networkidle', timeout=3000) # Shorter networkidle (e.g., 3 seconds) | |
logger.debug(f"[REPLAYER _apply NAV] page.wait_for_load_state('networkidle') completed for {target}") | |
logger.debug(f"[REPLAYER _apply NAV] Attempting time.sleep(0.3) for {target}") | |
time.sleep(0.3) # Small buffer | |
logger.debug(f"[REPLAYER _apply NAV] time.sleep(0.3) completed for {target}") | |
logger.debug(f"Network idle (or timeout) confirmed for {target}") | |
except Exception as e_wait: | |
logger.warning(f"Timeout or error during page load/networkidle wait on {target}: {e_wait.__class__.__name__} - {str(e_wait)}") | |
logger.info(f"✅🌐 Navigated: {target}") | |
logger.debug(f"[REPLAYER _apply] Action {typ} applied.") | |
return | |
if typ == "mouse_click": | |
btn = ev.get("button", "left") | |
recorded_text = ev.get("text", "").lower() if ev.get("text") else "" | |
self._clicked_with_selector = False | |
self._clicked_dispatch = False | |
if sel_event: | |
loc = self._resolve_click_locator(sel_event) | |
if loc: | |
try: | |
logger.debug(f"Attempting to click resolved locator for original selector: {sel_event}") | |
# Default explicit wait timeout | |
wait_timeout = 5000 | |
# Expanded keyword list | |
critical_keywords = [ | |
"download", "save", "submit", "next", "continue", "confirm", "upload", "add", "create", | |
"process", "generate", "apply", "send", "post", "tweet", "run", "execute", | |
"search", "go", "login", "signup", "pay", "checkout", "agree", "accept", "allow" | |
] | |
sel_event_lower = sel_event.lower() if sel_event else "" | |
is_critical_action = False | |
if any(keyword in recorded_text for keyword in critical_keywords): | |
is_critical_action = True | |
elif sel_event_lower and any(keyword in sel_event_lower for keyword in critical_keywords): | |
is_critical_action = True | |
# Specific checks for known critical element identifiers | |
if sel_event_lower and ( | |
'data-testid="send-button"' in sel_event_lower or | |
'data-testid*="submit"' in sel_event_lower or | |
'data-testid*="send"' in sel_event_lower or | |
'id*="submit-button"' in sel_event_lower or | |
'data-testid*="tweetbutton"' in sel_event_lower or | |
'id*="composer-submit-button"' in sel_event_lower # for chatgpt (example) | |
): | |
is_critical_action = True | |
if is_critical_action: | |
# Use original recorded text for logging if available, else empty string | |
log_text = ev.get('text', '') | |
logger.info(f"Critical action suspected (text: '{log_text}', selector: '{sel_event}'). Extending wait.") | |
wait_timeout = 15000 # 15 seconds | |
logger.debug(f"Waiting for selector '{sel_event}' to be visible and enabled with timeout {wait_timeout}ms.") | |
loc.wait_for(state='visible', timeout=wait_timeout) | |
loc.scroll_into_view_if_needed(timeout=wait_timeout) | |
logger.debug(f"Element '{sel_event}' is visible and enabled. Attempting standard click.") | |
print(f"[REPLAYER _apply CLICK] >>> Attempting loc.click() for '{sel_event}' with timeout {wait_timeout}ms", flush=True) | |
try: | |
loc.click(button=self.BTN_MAP.get(btn, "left"), timeout=wait_timeout, delay=100) | |
self._clicked_with_selector = True | |
logger.debug(f"[REPLAYER _apply CLICK] loc.click() for '{sel_event}' SUCCEEDED.") | |
logger.info(f"Standard Playwright click successful for resolved locator from selector: {sel_event}") | |
time.sleep(0.25) # Keep small delay after successful click | |
return # Successfully clicked | |
except SyncPlaywrightTimeoutError as pte_click: | |
logger.warning(f"[REPLAYER _apply CLICK] PlaywrightTimeoutError during standard loc.click() for '{sel_event}': {pte_click}") | |
except Exception as e_click: | |
logger.warning(f"[REPLAYER _apply CLICK] Exception during standard loc.click() for '{sel_event}': {e_click}", exc_info=True) | |
# Fallback 2: Try click with force=True | |
if not self._clicked_with_selector: | |
logger.debug(f"[REPLAYER _apply CLICK] Fallback 2: Attempting loc.click(force=True) for '{sel_event}'") | |
try: | |
loc.click(button=self.BTN_MAP.get(btn, "left"), timeout=wait_timeout, delay=100, force=True) | |
self._clicked_with_selector = True | |
logger.info(f"Forced Playwright click successful for '{sel_event}'") | |
time.sleep(0.25) | |
return | |
except SyncPlaywrightTimeoutError as pte_force_click: | |
logger.warning(f"[REPLAYER _apply CLICK] PlaywrightTimeoutError during loc.click(force=True) for '{sel_event}': {pte_force_click}") | |
except Exception as e_force_click: | |
logger.warning(f"[REPLAYER _apply CLICK] Exception during loc.click(force=True) for '{sel_event}': {e_force_click}", exc_info=True) | |
except SyncPlaywrightTimeoutError as e_timeout: | |
logger.warning(f"Timeout ({wait_timeout}ms) waiting for element '{sel_event}' (visible/enabled) or during click: {e_timeout.__class__.__name__}") | |
# Fall through to other fallbacks if timeout | |
except Exception as e_click_attempt1: | |
logger.warning(f"Standard Playwright click (attempt 1) for resolved locator from '{sel_event}' failed: {e_click_attempt1.__class__.__name__} ({str(e_click_attempt1)})") | |
# Fallback to dispatchEvent if standard click failed (and not returned) | |
if not self._clicked_with_selector: | |
try: | |
logger.info(f"Fallback 3 (Final): Attempting to dispatch click event for resolved locator from '{sel_event}'") | |
logger.debug(f"[REPLAYER _apply CLICK] Fallback 3: Attempting dispatchEvent for '{sel_event}'") | |
if loc.count() > 0: | |
element_handle = loc.element_handle(timeout=1000) | |
if element_handle: | |
element_handle.dispatch_event('click') | |
self._clicked_dispatch = True | |
self._clicked_with_selector = True | |
logger.info(f"DispatchEvent (via element_handle) click successful for '{sel_event}'") | |
time.sleep(0.25) | |
return | |
else: | |
loc.dispatch_event('click') | |
self._clicked_dispatch = True | |
self._clicked_with_selector = True | |
logger.info(f"DispatchEvent (via locator) click successful for '{sel_event}'") | |
time.sleep(0.25) | |
return | |
else: | |
logger.error(f"Cannot dispatch click for '{sel_event}', resolved locator is empty.") | |
except Exception as e_dispatch: | |
logger.warning(f"DispatchEvent click failed for '{sel_event}': {e_dispatch.__class__.__name__} ({str(e_dispatch)}). Falling back to XY if available.") | |
# Fallback to XY click if selector-based attempts failed or no selector | |
if not self._clicked_with_selector: | |
log_x, log_y = ev.get("x"), ev.get("y") | |
if log_x is not None and log_y is not None: | |
logger.info(f"Fallback: Performing coordinate-based click at ({log_x},{log_y})") | |
self.page.mouse.click(log_x, log_y, button=self.BTN_MAP.get(btn, "left")) | |
time.sleep(0.25) | |
else: | |
if sel_event: | |
logger.error(f"All click attempts failed for selector '{sel_event}' and no XY coordinates available.") | |
return | |
if typ == "keyboard_input": | |
key_to_press = ev["key"] | |
modifiers_for_press = ev.get("modifiers", []) # REVERTED to 'modifiers' | |
sel_for_press = ev.get("selector") | |
logger.debug(f"APPLYING SINGLE KEY PRESS: '{key_to_press}' (mods: {modifiers_for_press}) -> {sel_for_press or 'no specific target'}") | |
if sel_for_press: | |
try: | |
target_loc_key_press = self.page.locator(sel_for_press).first | |
if target_loc_key_press.count() > 0: | |
target_loc_key_press.focus(timeout=800) | |
else: | |
logger.warning(f"Target element for key press not found: {sel_for_press}") | |
except Exception as e_focus_single_key: | |
logger.debug(f"Focus failed for selector '{sel_for_press}' during single key press: {e_focus_single_key.__class__.__name__}") | |
mapped_mods_press = [self.MOD_MAP[m] for m in modifiers_for_press if m in self.MOD_MAP] | |
for m_down_key in mapped_mods_press: self.page.keyboard.down(m_down_key) | |
try: | |
self.page.keyboard.press(key_to_press) | |
except Exception as e_press: | |
logger.error(f"Error during page.keyboard.press('{key_to_press}'): {e_press.__class__.__name__} - {str(e_press)}") | |
for m_up_key in reversed(mapped_mods_press): self.page.keyboard.up(m_up_key) | |
logger.debug(f"✅ done SINGLE KEY PRESS: '{key_to_press}' -> {sel_for_press or 'no specific target'}") | |
return | |
# --- NEW EVENT HANDLERS --- | |
elif typ == "clipboard_copy": | |
logger.debug(f"[REPLAYER _apply] Executing clipboard_copy controller action.") | |
self.controller.execute("Copy text to clipboard", text=ev["text"]) | |
logger.info(f"📋 Executed Copy: text='{(ev['text'][:30] + '...') if len(ev['text']) > 30 else ev['text']}'") | |
return | |
elif typ == "clipboard_paste": | |
logger.debug(f"[REPLAYER _apply] Executing clipboard_paste controller action for selector: {ev['selector']}.") | |
self.controller.execute("Paste text from clipboard", selector=ev["selector"]) | |
logger.info(f"📋 Executed Paste into selector='{ev['selector']}'") | |
return | |
elif typ == "file_upload": | |
logger.debug(f"[REPLAYER _apply] Processing file_upload for selector: {ev['selector']}, file_name: {ev.get('file_name')}") | |
file_path_to_upload = None | |
trace_file_name = ev.get("file_name") | |
if trace_file_name and self.user_provided_files: | |
for user_file_path_str in self.user_provided_files: | |
user_file_path = Path(user_file_path_str) | |
if user_file_path.name == trace_file_name: | |
if user_file_path.exists(): | |
file_path_to_upload = str(user_file_path) | |
logger.info(f"Using user-provided file for '{trace_file_name}': {file_path_to_upload}") | |
break | |
else: | |
logger.warning(f"User-provided file '{user_file_path_str}' for '{trace_file_name}' does not exist.") | |
if not file_path_to_upload: | |
trace_event_file_path = ev.get("file_path") # This is the one from original recording (often empty) | |
if trace_event_file_path: | |
path_obj = Path(trace_event_file_path).expanduser() | |
if path_obj.exists(): | |
file_path_to_upload = str(path_obj) | |
logger.info(f"Using file_path from trace for '{trace_file_name or 'unknown'}': {file_path_to_upload}") | |
else: | |
logger.warning(f"file_path '{trace_event_file_path}' from trace for '{trace_file_name or 'unknown'}' does not exist.") | |
if not file_path_to_upload and trace_file_name: | |
fallback_path = Path(f"~/Downloads/{trace_file_name}").expanduser() | |
if fallback_path.exists(): | |
file_path_to_upload = str(fallback_path) | |
logger.info(f"Using fallback file for '{trace_file_name}': {file_path_to_upload}") | |
else: | |
logger.warning(f"Fallback file '{fallback_path}' for '{trace_file_name}' does not exist.") | |
if file_path_to_upload: | |
logger.debug(f"[REPLAYER _apply] Executing file_upload controller action with path: {file_path_to_upload}") | |
self.controller.execute("Upload local file", | |
selector=ev["selector"], | |
file_path=file_path_to_upload) | |
logger.info(f"📤 Executed Upload: file='{trace_file_name or Path(file_path_to_upload).name}' (path: '{file_path_to_upload}') to selector='{ev['selector']}'") | |
else: | |
logger.error(f"Could not determine a valid file path for upload event: {ev}. Skipping upload.") | |
return | |
elif typ == "file_download": | |
# Pass all necessary info from the event to the controller | |
logger.debug(f"[REPLAYER _apply] Executing file_download controller action for: {ev.get('suggested_filename')}") | |
self.controller.execute( | |
"Download remote file", | |
url=ev.get("download_url"), # Original download URL (for info/logging) | |
suggested_filename=ev.get("suggested_filename"), | |
recorded_local_path=ev.get("recorded_local_path") # Path to file saved during recording | |
# dest_dir will be handled by CustomController.execute with its default if not present here | |
) | |
logger.info(f"💾 Replay: Executed 'Download remote file' action for: {ev.get('suggested_filename')}") | |
return | |
# --- END NEW EVENT HANDLERS --- | |
logger.debug(f"✅ done {typ} (no specific apply action in this path or already handled by controller.execute)") | |
logger.debug(f"[REPLAYER _apply] Action {typ} applied (end of _apply general path).") | |
def _resolve_click_locator(self, sel: str) -> Optional[SyncLocator]: | |
if not sel: return None | |
# Initial locator based on the selector from the trace | |
initial_loc: SyncLocator = self.page.locator(sel).first | |
# Check if the initial locator itself is a button or has role="button" | |
# Use a try-catch for evaluate as the element might not exist or be stale | |
try: | |
if initial_loc and initial_loc.count() > 0: # Ensure element exists before evaluation | |
# Check if the element itself is a button or has role="button" | |
is_button_or_has_role = initial_loc.evaluate( | |
"el => el.tagName === 'BUTTON' || el.getAttribute('role') === 'button'" | |
) | |
if is_button_or_has_role: | |
logger.debug(f"_resolve_click_locator: Initial selector '{sel}' is already a button or has role='button'. Using it.") | |
return initial_loc | |
else: | |
logger.debug(f"_resolve_click_locator: Initial selector '{sel}' did not yield any elements. Will try to find ancestor.") | |
# If initial_loc.count() is 0, initial_loc might not be suitable for ancestor search directly, | |
# but Playwright handles this by searching from the page if the locator is empty. | |
# However, it's cleaner to ensure we have a starting point if we intend to find an ancestor *of something*. | |
# For now, we will proceed, and if initial_loc is empty, the ancestor search becomes a page-wide search for a button. | |
except Exception as e_eval_initial: | |
logger.debug(f"_resolve_click_locator: Error evaluating initial selector '{sel}': {e_eval_initial}. Will try to find ancestor.") | |
# If not, or if initial check failed, try to find an ancestor that is a button or has role="button" | |
# This also covers cases where `sel` might point to an inner element of a button (e.g., a span). | |
# The XPath searches for an ancestor OR self that is a button or has the role. | |
# Using a more specific XPath to find the closest ancestor or self that is a button: | |
# xpath=ancestor-or-self::button | ancestor-or-self::*[@role='button'] | |
# Playwright's loc.locator("xpath=...") will find the first such element from the perspective of `loc`. | |
# If initial_loc was empty, this effectively searches from page root. | |
# Let's try a slightly different approach for finding the button: use Playwright's :nth-match with a broader internal selector. | |
# This attempts to find the *actual element* matching 'sel', then looks upwards or at itself for a button. | |
# This is more robust if 'sel' is very specific to an inner element. | |
# Re-fetch the initial locator to ensure we are working from the element pointed to by `sel` | |
# This is important if `sel` is like 'div > span' - we want the span, then find its button parent. | |
# If initial_loc.count() was 0 above, this will still be an empty locator. | |
element_loc = self.page.locator(sel).first | |
if element_loc.count() > 0: | |
# Try to find a button by looking at the element itself or its ancestors | |
# This combines checking self and then ascending. | |
# The XPath 'ancestor-or-self::button | ancestor-or-self::*[@role="button"]' correctly finds the button. | |
# We then take the .first of these, as Playwright will return them in document order (ancestors first). | |
# To get the *closest* (most specific) button, we might need to be careful. | |
# However, Playwright's .locator on an existing locator usually chains correctly. | |
# Let's try to find the *specific* element by `sel` and then chain to find its button ancestor or self. | |
# This is more reliable than a broad page search if `sel` is specific. | |
potential_button_loc = element_loc.locator("xpath=ancestor-or-self::button | ancestor-or-self::*[@role='button']").first | |
if potential_button_loc.count() > 0: | |
logger.debug(f"_resolve_click_locator: Found button/role=button for '{sel}' via ancestor-or-self. Using it.") | |
return potential_button_loc | |
else: | |
logger.debug(f"_resolve_click_locator: No button ancestor found for specific element of '{sel}'. Falling back to initial locator if it exists.") | |
return element_loc if element_loc and element_loc.count() > 0 else None | |
else: | |
# If the original selector `sel` finds nothing, try a page-wide search for a button that might contain the text from `sel` if `sel` was text-based | |
# This part is tricky and heuristic. For now, if `sel` finds nothing, we return None. | |
logger.debug(f"_resolve_click_locator: Initial selector '{sel}' found no elements. Cannot resolve to a button.") | |
return None | |
# ------------- verify ------------- | |
def _verify_tweet_posted(self): | |
try: | |
self.page.wait_for_selector('[role=alert]:text("sent")', timeout=3000) | |
logger.info("Tweet post verification successful: 'sent' toast found.") | |
except Exception as e_toast: | |
logger.error(f"Tweet post verification failed: 'sent' toast not found within timeout. Error: {e_toast.__class__.__name__}") | |
def _verify(self, ev: Dict[str, Any]): | |
typ = ev["type"] | |
sel_from_event_verify = ev.get("selector") | |
if typ == "navigation": | |
if not TraceReplayerSync._url_eq(self.page.url, ev["to"]): | |
current_event_expected_url = ev["url"] | |
nav_target_url = ev["to"] | |
actual_page_url = self.page.url | |
if TraceReplayerSync._url_eq(actual_page_url, nav_target_url): | |
logger.debug(f"Navigation URL verified: Expected target {nav_target_url}, Got {actual_page_url}") | |
return | |
logger.warning(f"Potential Navigation URL drift: Expected target {nav_target_url}, but current URL is {actual_page_url}. Original event recorded at {current_event_expected_url}") | |
current_event_index = -1 | |
try: | |
current_event_index = self.trace.index(ev) | |
except ValueError: | |
logger.error("Critical: Could not find current navigation event in trace for drift recovery. Raising drift based on target mismatch.") | |
raise Drift(f"URL drift for navigation: expected target {nav_target_url}, got {actual_page_url}", ev) | |
if 0 <= current_event_index < len(self.trace) - 1: | |
next_event = self.trace[current_event_index + 1] | |
logger.debug(f"Drift check for navigation: Next event is type '{next_event.get('type')}', URL '{next_event.get('url')}', To '{next_event.get('to')}'") | |
if next_event.get("type") == "navigation": | |
next_event_nav_target_url = next_event.get("to") | |
next_event_recorded_at_url = next_event.get("url") | |
if next_event_nav_target_url and TraceReplayerSync._url_eq(actual_page_url, next_event_nav_target_url): | |
logger.info(f"Drift recovery for navigation: Actual URL {actual_page_url} matches TARGET of NEXT navigation. Allowing.") | |
return | |
if next_event_recorded_at_url and TraceReplayerSync._url_eq(actual_page_url, next_event_recorded_at_url): | |
logger.info(f"Drift recovery for navigation: Actual URL {actual_page_url} matches RECORDED URL of NEXT navigation. Allowing.") | |
return | |
logger.error(f"URL drift CONFIRMED for navigation: expected target {nav_target_url}, got {actual_page_url}") | |
raise Drift(f"URL drift for navigation: expected target {nav_target_url}, got {actual_page_url}", ev) | |
return | |
if typ == "mouse_click" and self._clicked_with_selector and sel_from_event_verify: | |
if "tweetButton" in sel_from_event_verify: | |
self._verify_tweet_posted() | |
return | |
if getattr(self, "_clicked_dispatch", False): | |
logger.info(f"Verification for selector '{sel_from_event_verify}': Skipped standard DOM check as dispatchEvent was used (element might be detached/changed).") | |
return | |
recorded_text = ev.get("text") | |
if recorded_text is not None: | |
try: | |
verify_loc = self._resolve_click_locator(sel_from_event_verify) | |
if verify_loc and verify_loc.count() > 0: | |
current_text = (verify_loc.inner_text(timeout=1000)).strip() | |
if current_text == recorded_text: | |
logger.info(f"Inner text matched for {sel_from_event_verify}: '{recorded_text}'") | |
else: | |
logger.warning(f"Text drift for {sel_from_event_verify}: expected '{recorded_text}', got '{current_text}'") | |
else: | |
logger.warning(f"Cannot verify text for {sel_from_event_verify}, element not found by re-resolving after click.") | |
except Exception as e_text_verify: | |
logger.warning(f"Error during text verification for {sel_from_event_verify}: {str(e_text_verify)}") | |
return | |
if typ == "keyboard_input": | |
try: | |
active_element_focused = self.page.evaluate("document.activeElement !== null && document.activeElement !== document.body") | |
if not active_element_focused: | |
logger.debug("No specific element has focus after typing for event: %s", ev.get("selector")) | |
except Exception as e: | |
logger.debug("Error checking active element after typing: %s", e) | |
return | |
# Selector verification (if applicable) | |
# This part remains unchanged from your existing logic if you have it. | |
# For example, if a click was supposed to happen on a selector: | |
if ev["type"] == "mouse_click" and ev.get("selector") and not self._clicked_with_selector: | |
# This implies the fallback XY click was used, which can be a form of drift. | |
# You might want to log this or handle it as a minor drift. | |
logger.debug(f"Verification: Click for selector '{ev['selector']}' used XY fallback.") | |
# URL drift check | |
current_event_expected_url = ev["url"] | |
actual_page_url = self.page.url | |
if not TraceReplayerSync._url_eq(actual_page_url, current_event_expected_url): | |
logger.warning(f"Potential URL drift: expected {current_event_expected_url} (from event record), got {actual_page_url} (actual browser URL).") | |
current_event_index = -1 | |
try: | |
# Find the index of the current event 'ev' in self.trace | |
# This is okay for moderately sized traces. Consider passing index if performance becomes an issue. | |
current_event_index = self.trace.index(ev) | |
except ValueError: | |
logger.error("Critical: Could not find current event in trace for drift recovery. This shouldn't happen. Raising original drift.") | |
raise Drift(f"URL drift (and event indexing error): expected {current_event_expected_url}, got {actual_page_url}", ev) | |
if 0 <= current_event_index < len(self.trace) - 1: | |
next_event = self.trace[current_event_index + 1] | |
logger.debug(f"Drift check: Next event is type '{next_event.get('type')}', URL '{next_event.get('url')}', To '{next_event.get('to')}'") | |
if next_event.get("type") == "navigation": | |
next_event_target_url = next_event.get("to") | |
next_event_recorded_at_url = next_event.get("url") | |
# Condition 1: The browser is AT the target URL of the NEXT navigation event. | |
# This means the current navigation (ev) effectively led to where next_event will go. | |
if next_event_target_url and TraceReplayerSync._url_eq(actual_page_url, next_event_target_url): | |
logger.info(f"Drift recovery: Actual URL {actual_page_url} matches TARGET ('to') of the NEXT navigation event. Allowing.") | |
return | |
# Condition 2: The browser is AT the URL where the NEXT navigation event was RECORDED. | |
# This means the current navigation (ev) might have been part of a quick redirect chain, | |
# and the page has landed on the 'url' from which the next_event was initiated. | |
# This is relevant if next_event_target_url is different from next_event_recorded_at_url | |
if next_event_recorded_at_url and TraceReplayerSync._url_eq(actual_page_url, next_event_recorded_at_url): | |
logger.info(f"Drift recovery: Actual URL {actual_page_url} matches RECORDED URL ('url') of the NEXT navigation event. Allowing.") | |
return | |
# If no recovery condition met, raise the original drift error | |
logger.error(f"URL drift CONFIRMED after checks: expected {current_event_expected_url} (from event record), got {actual_page_url} (actual browser URL).") | |
raise Drift(f"URL drift: expected {current_event_expected_url}, got {actual_page_url}", ev) | |
else: | |
logger.debug(f"URL verified: Expected {current_event_expected_url}, Got {actual_page_url}") | |
# ---------- util ---------- | |
def _url_eq(a, b): | |
if not a or not b: return False | |
pa, pb = urlparse(a), urlparse(b) | |
if pa.netloc.replace('www.','') != pb.netloc.replace('www.',''): return False | |
if pa.path.rstrip('/') != pb.path.rstrip('/'): return False | |
KEEP = {'q','tbm','hl'} | |
qa = {k:v for k,v in parse_qs(pa.query).items() if k in KEEP} | |
qb = {k:v for k,v in parse_qs(pb.query).items() if k in KEEP} | |
return qa == qb | |
# -------------------------------------------------- | |
# CLI demo (optional) | |
# -------------------------------------------------- | |
async def _cli_demo(url: str, trace_path: str): | |
from playwright.async_api import async_playwright | |
# from src.controller.custom_controller import CustomController # Async controller | |
print("[CLI_DEMO] WARNING: _cli_demo is not yet updated for TraceReplayerSync and CustomControllerSync. Skipping full replay test.", flush=True) | |
# Temporarily disable the replayer part of the CLI demo until CustomControllerSync is ready | |
logger.info(f"CLI Demo: Replaying trace '{trace_path}' starting at URL '{url}'") | |
async with async_playwright() as pw: | |
browser = await pw.chromium.launch(headless=False) # Usually headless=False for observing replay | |
# Create a new context for each replay for isolation | |
context = await browser.new_context() | |
page = await context.new_page() | |
# Navigate to the initial URL mentioned in the trace or a default start URL | |
# The replayer itself handles navigation events from the trace. | |
# So, `url` here is the very first URL to open before replaying starts. | |
logger.info(f"CLI Demo: Initial navigation to {url}") | |
try: | |
await page.goto(url, wait_until="networkidle", timeout=15000) | |
except Exception as e_goto: | |
logger.warning(f"CLI Demo: Initial goto to {url} failed or timed out: {e_goto}. Attempting to continue replay.") | |
# Instantiate your custom controller | |
# controller = CustomController() | |
# Load trace and instantiate replayer with the controller | |
try: | |
trace_events = load_trace(trace_path) | |
if not trace_events: | |
logger.error(f"CLI Demo: No events found in trace file: {trace_path}") | |
await browser.close() | |
return | |
logger.info(f"CLI Demo: Loaded {len(trace_events)} events from {trace_path}") | |
except Exception as e_load: | |
logger.error(f"CLI Demo: Failed to load trace file {trace_path}: {e_load}") | |
await browser.close() | |
return | |
# rep = TraceReplayerSync(page, trace_events, controller, user_provided_files=None) # Pass the controller | |
# try: | |
# rep.play(speed=1) # Adjust speed as needed (1.0 is real-time, higher is faster) | |
# logger.info("✅ CLI Demo: Replay completed") | |
# except Drift as d: | |
# logger.error(f"⚠️ CLI Demo: Drift detected during replay: {d}") | |
# if d.event: | |
# logger.error(f"Drift occurred at event: {json.dumps(d.event, indent=2)}") | |
# except Exception as e_play: | |
# logger.error(f"💥 CLI Demo: An error occurred during replay: {e_play}", exc_info=True) | |
# finally: | |
# logger.info("CLI Demo: Closing browser...") | |
# # Keep browser open for a few seconds to inspect final state, then close | |
# # await asyncio.sleep(5) # Optional: delay before closing | |
# await browser.close() | |
# For now, just close the browser after setup | |
print("[CLI_DEMO] Intentionally skipping replay part in CLI demo for now.", flush=True) | |
await browser.close() | |
if __name__ == "__main__": | |
import sys, asyncio as _a | |
# Ensure correct arguments are provided | |
if len(sys.argv) < 3: | |
print("Usage: python src/utils/replayer.py <start_url> <path_to_trace_file.jsonl>") | |
sys.exit(1) | |
# Configure logging for the CLI demo if run directly | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
_a.run(_cli_demo(sys.argv[1], sys.argv[2])) |