import asyncio, json, logging, time from pathlib import Path from typing import List, Dict, Any, Optional, Literal from urllib.parse import urlparse, parse_qs from playwright.sync_api import Page as SyncPage, TimeoutError as SyncPlaywrightTimeoutError, Locator as SyncLocator, ElementHandle as SyncElementHandle logger = logging.getLogger(__name__) # -------------------------------------------------- # Exceptions # -------------------------------------------------- class Drift(Exception): """Raised when replay diverges from expected state.""" def __init__(self, msg: str, event: Dict[str, Any] | None = None): super().__init__(msg) self.event = event # -------------------------------------------------- # Trace loader helper # -------------------------------------------------- def load_trace(path: str | Path) -> List[Dict[str, Any]]: return [json.loads(l) for l in Path(path).read_text().splitlines() if l.strip()] # -------------------------------------------------- # Replayer # -------------------------------------------------- class TraceReplayerSync: BTN_MAP: Dict[str, Literal["left", "middle", "right"]] = {"left": "left", "middle": "middle", "right": "right"} MOD_MAP = {"alt": "Alt", "ctrl": "Control", "shift": "Shift", "meta": "Meta"} def __init__(self, page: SyncPage, trace: List[Dict[str, Any]], controller: Any, user_provided_files: Optional[List[str]] = None, ui_q: Optional[asyncio.Queue] = None, main_loop: Optional[asyncio.AbstractEventLoop] = None): logger.debug(f"[REPLAYER_SYNC __init__] Initializing. Page: {type(page)}, Trace_events: {len(trace) if trace else 0}, Controller: {type(controller)}") self.page = page self.trace = trace self.controller = controller self.user_provided_files = user_provided_files or [] # Store user-provided file paths self._clicked_with_selector = False self._clicked_dispatch = False self.ui_q = ui_q self.main_loop = main_loop # ------------- main loop ------------- def play(self, speed: float = 2.0): i = 0 logger.debug(f"[REPLAYER play] Starting play loop. Trace length: {len(self.trace)}") while i < len(self.trace): ev = self.trace[i] logger.debug(f"[REPLAYER play] Processing event {i+1}/{len(self.trace)}: Type: {ev.get('type')}, URL: {ev.get('url')}") # Avoid processing 'v' key press before clipboard_paste event # Check if the current event is 'v' key input and the next is 'clipboard_paste' if ev.get("type") == "keyboard_input" and ev.get("key") == "v" and not ev.get("modifiers"): if (i + 1) < len(self.trace): next_ev = self.trace[i+1] if next_ev.get("type") == "clipboard_paste": logger.info(f"[REPLAYER play] Skipping 'v' key press before clipboard_paste. Event {i+1}") i += 1 # Skip the 'v' key press event ev = next_ev # Process the clipboard_paste event in this iteration logger.debug(f"[REPLAYER play] Now processing event {i+1}/{len(self.trace)}: Type: {ev.get('type')}, URL: {ev.get('url')}") # New concise and iconic log format log_type = ev["type"] current_event_url = ev.get("url", "N/A") # URL from the event itself log_message_elements = [] if log_type == "mouse_click": log_message_elements.append("🖱️ MouseClick") button_text = ev.get("text") selector = ev.get("selector") if button_text: log_message_elements.append(f"button_text:\"{button_text}\"") elif selector: log_message_elements.append(f"selector:\"{selector}\"") else: log_message_elements.append(f"xy:({ev.get('x', 'N/A')},{ev.get('y', 'N/A')})") button_type = ev.get("button", "left") if button_type != "left": # Only show if not default left click log_message_elements.append(f"button:\"{button_type}\"") log_message_elements.append(f"url='{current_event_url}'") elif log_type == "keyboard_input": log_message_elements.append("⌨️ KeyInput") key_val = ev.get("key") log_message_elements.append(f"key:'{key_val}'") modifiers = ev.get("modifiers") if modifiers: log_message_elements.append(f"mods:{modifiers}") log_message_elements.append(f"url='{current_event_url}'") elif log_type == "navigation": log_message_elements.append("🌐 Navigation") to_url = ev.get("to") log_message_elements.append(f"to='{to_url}'") else: # Generic fallback for other event types like scroll, viewport_change etc. log_message_elements.append(f"{log_type.replace('_', ' ').title()}") s = ev.get("selector") if s: log_message_elements.append(f"selector:\"{s}\"") if 'x' in ev and 'y' in ev: log_message_elements.append(f"coords:({ev.get('x')},{ev.get('y')})") log_message_elements.append(f"url='{current_event_url}'") # Send iconic log message to UI queue log_msg_str = ", ".join(log_message_elements) if self.ui_q and self.main_loop: self.main_loop.call_soon_threadsafe(self.ui_q.put_nowait, log_msg_str) else: # Fallback to standard logger if queue/loop not provided (e.g. during testing) logger.info(log_msg_str) # Delay logic logger.debug(f"[REPLAYER play] Event {i+1}: Applying delay of {ev.get('t', 0)}ms, speed adjusted: {ev.get('t', 0)/speed}ms") event_delay_ms = ev.get("t", 0) if event_delay_ms > 10: # Log only if delay is > 10ms (to avoid spamming for 0ms delays) logger.debug(f"Pausing for {event_delay_ms/1000.0:.3f}s (speed adjusted: {event_delay_ms/1000.0/speed:.3f}s)") time.sleep(event_delay_ms / 1000.0 / speed) if ev["type"] == "keyboard_input": consumed = self._batch_type(i) i += consumed logger.debug(f"[REPLAYER play] Event {i+1-consumed} (keyboard_input batch): Consumed {consumed} events. New index: {i}") continue self._apply(ev) logger.debug(f"[REPLAYER play] Event {i+1}: _apply(ev) completed.") self._verify(ev) logger.debug(f"[REPLAYER play] Event {i+1}: _verify(ev) completed.") i += 1 logger.debug(f"[REPLAYER play] Play loop finished.") # ------------- batching ------------- def _batch_type(self, idx: int) -> int: ev_start_batch = self.trace[idx] sel, mods = ev_start_batch.get("selector"), ev_start_batch.get("modifiers", []) text_to_type = "" current_idx_in_trace = idx first_key = ev_start_batch.get("key", "") is_first_key_batchable = len(first_key) == 1 and not mods if is_first_key_batchable: text_to_type = first_key current_idx_in_trace = idx + 1 while current_idx_in_trace < len(self.trace): nxt = self.trace[current_idx_in_trace] if nxt["type"] != "keyboard_input" or nxt.get("t",1) != 0: break if nxt.get("selector") != sel: break if nxt.get("modifiers"): break next_key_char = nxt.get("key", "") if len(next_key_char) == 1: text_to_type += next_key_char current_idx_in_trace += 1 else: break current_idx_in_trace -= 1 num_events_processed = 0 if len(text_to_type) > 1: self._apply_type(sel, text_to_type, [], ev_start_batch) self._verify(ev_start_batch) num_events_processed = current_idx_in_trace - idx + 1 else: self._apply(ev_start_batch) self._verify(ev_start_batch) num_events_processed = 1 return num_events_processed def _apply_type(self, sel: Optional[str], text: str, mods: List[str], original_event_for_log: Dict[str, Any]): log_sel_for_type = sel or "N/A" logger.debug(f"APPLYING BATCH TYPE: '{text}' -> {log_sel_for_type}") if sel: try: element_to_fill = self.page.locator(sel).first element_to_fill.wait_for(state='visible', timeout=5000) element_to_fill.focus(timeout=1000) time.sleep(0.2) # Short delay after focus before filling element_to_fill.fill(text) except Exception as e_fill: logger.error(f"Error during locator.fill('{text}') for selector '{sel}': {e_fill.__class__.__name__} - {str(e_fill)}. Falling back to keyboard.type.") # Fallback to original keyboard.type if fill fails for some reason mapped_mods = [self.MOD_MAP[m] for m in mods if m in self.MOD_MAP] for m_down in mapped_mods: self.page.keyboard.down(m_down) try: self.page.keyboard.type(text) except Exception as e_type: logger.error(f"Error during fallback page.keyboard.type('{text}'): {e_type.__class__.__name__} - {str(e_type)}") for m_up in reversed(mapped_mods): self.page.keyboard.up(m_up) else: # If no selector, fallback to general keyboard typing (less common for batched text) logger.warning(f"Attempting to batch type '{text}' without a selector. Using page.keyboard.type().") mapped_mods = [self.MOD_MAP[m] for m in mods if m in self.MOD_MAP] for m_down in mapped_mods: self.page.keyboard.down(m_down) try: self.page.keyboard.type(text) except Exception as e_type: logger.error(f"Error during page.keyboard.type('{text}') without selector: {e_type.__class__.__name__} - {str(e_type)}") for m_up in reversed(mapped_mods): self.page.keyboard.up(m_up) logger.debug(f"✅ done BATCH TYPE: '{text}' -> {log_sel_for_type}") # ------------- apply ------------- def _apply(self, ev: Dict[str, Any]): typ = ev["type"] sel_event = ev.get("selector") logger.debug(f"[REPLAYER _apply] Applying action: {typ}, selector: {sel_event}, keys: {ev.get('key')}, to: {ev.get('to')}") logger.debug(f"APPLYING ACTION: {typ} for sel={sel_event or 'N/A'}, key={ev.get('key','N/A')}") if typ == "navigation": target = ev["to"] if not self._url_eq(self.page.url, target): logger.debug(f"[REPLAYER _apply NAV] Attempting self.page.goto('{target}')") try: # Restore original navigation target and timeout self.page.goto(target, wait_until="domcontentloaded", timeout=15000) logger.debug(f"[REPLAYER _apply NAV] self.page.goto to '{target}' SUCCEEDED.") except SyncPlaywrightTimeoutError as pte_goto: logger.error(f"[REPLAYER _apply NAV] PlaywrightTimeoutError during goto '{target}': {pte_goto}", exc_info=True) except Exception as e_goto_general: logger.error(f"[REPLAYER _apply NAV] Exception during goto '{target}': {e_goto_general}", exc_info=True) else: logger.debug(f"[REPLAYER _apply NAV] Page URL {self.page.url} already matches target {target}. Skipping goto.") logger.debug(f"[REPLAYER _apply NAV] Attempting page.bring_to_front() for {target}") self.page.bring_to_front() logger.debug(f"[REPLAYER _apply NAV] page.bring_to_front() completed for {target}") # Enhanced wait after navigation try: logger.debug(f"Waiting for 'load' state after navigating to {target}") logger.debug(f"[REPLAYER _apply NAV] Attempting page.wait_for_load_state('load') for {target}") self.page.wait_for_load_state('load', timeout=10000) # Wait for basic load logger.debug(f"[REPLAYER _apply NAV] page.wait_for_load_state('load') completed for {target}") logger.debug(f"'load' state confirmed for {target}. Now waiting for networkidle.") logger.debug(f"[REPLAYER _apply NAV] Attempting page.wait_for_load_state('networkidle') for {target}") self.page.wait_for_load_state('networkidle', timeout=3000) # Shorter networkidle (e.g., 3 seconds) logger.debug(f"[REPLAYER _apply NAV] page.wait_for_load_state('networkidle') completed for {target}") logger.debug(f"[REPLAYER _apply NAV] Attempting time.sleep(0.3) for {target}") time.sleep(0.3) # Small buffer logger.debug(f"[REPLAYER _apply NAV] time.sleep(0.3) completed for {target}") logger.debug(f"Network idle (or timeout) confirmed for {target}") except Exception as e_wait: logger.warning(f"Timeout or error during page load/networkidle wait on {target}: {e_wait.__class__.__name__} - {str(e_wait)}") logger.info(f"✅🌐 Navigated: {target}") logger.debug(f"[REPLAYER _apply] Action {typ} applied.") return if typ == "mouse_click": btn = ev.get("button", "left") recorded_text = ev.get("text", "").lower() if ev.get("text") else "" self._clicked_with_selector = False self._clicked_dispatch = False if sel_event: loc = self._resolve_click_locator(sel_event) if loc: try: logger.debug(f"Attempting to click resolved locator for original selector: {sel_event}") # Default explicit wait timeout wait_timeout = 5000 # Expanded keyword list critical_keywords = [ "download", "save", "submit", "next", "continue", "confirm", "upload", "add", "create", "process", "generate", "apply", "send", "post", "tweet", "run", "execute", "search", "go", "login", "signup", "pay", "checkout", "agree", "accept", "allow" ] sel_event_lower = sel_event.lower() if sel_event else "" is_critical_action = False if any(keyword in recorded_text for keyword in critical_keywords): is_critical_action = True elif sel_event_lower and any(keyword in sel_event_lower for keyword in critical_keywords): is_critical_action = True # Specific checks for known critical element identifiers if sel_event_lower and ( 'data-testid="send-button"' in sel_event_lower or 'data-testid*="submit"' in sel_event_lower or 'data-testid*="send"' in sel_event_lower or 'id*="submit-button"' in sel_event_lower or 'data-testid*="tweetbutton"' in sel_event_lower or 'id*="composer-submit-button"' in sel_event_lower # for chatgpt (example) ): is_critical_action = True if is_critical_action: # Use original recorded text for logging if available, else empty string log_text = ev.get('text', '') logger.info(f"Critical action suspected (text: '{log_text}', selector: '{sel_event}'). Extending wait.") wait_timeout = 15000 # 15 seconds logger.debug(f"Waiting for selector '{sel_event}' to be visible and enabled with timeout {wait_timeout}ms.") loc.wait_for(state='visible', timeout=wait_timeout) loc.scroll_into_view_if_needed(timeout=wait_timeout) logger.debug(f"Element '{sel_event}' is visible and enabled. Attempting standard click.") print(f"[REPLAYER _apply CLICK] >>> Attempting loc.click() for '{sel_event}' with timeout {wait_timeout}ms", flush=True) try: loc.click(button=self.BTN_MAP.get(btn, "left"), timeout=wait_timeout, delay=100) self._clicked_with_selector = True logger.debug(f"[REPLAYER _apply CLICK] loc.click() for '{sel_event}' SUCCEEDED.") logger.info(f"Standard Playwright click successful for resolved locator from selector: {sel_event}") time.sleep(0.25) # Keep small delay after successful click return # Successfully clicked except SyncPlaywrightTimeoutError as pte_click: logger.warning(f"[REPLAYER _apply CLICK] PlaywrightTimeoutError during standard loc.click() for '{sel_event}': {pte_click}") except Exception as e_click: logger.warning(f"[REPLAYER _apply CLICK] Exception during standard loc.click() for '{sel_event}': {e_click}", exc_info=True) # Fallback 2: Try click with force=True if not self._clicked_with_selector: logger.debug(f"[REPLAYER _apply CLICK] Fallback 2: Attempting loc.click(force=True) for '{sel_event}'") try: loc.click(button=self.BTN_MAP.get(btn, "left"), timeout=wait_timeout, delay=100, force=True) self._clicked_with_selector = True logger.info(f"Forced Playwright click successful for '{sel_event}'") time.sleep(0.25) return except SyncPlaywrightTimeoutError as pte_force_click: logger.warning(f"[REPLAYER _apply CLICK] PlaywrightTimeoutError during loc.click(force=True) for '{sel_event}': {pte_force_click}") except Exception as e_force_click: logger.warning(f"[REPLAYER _apply CLICK] Exception during loc.click(force=True) for '{sel_event}': {e_force_click}", exc_info=True) except SyncPlaywrightTimeoutError as e_timeout: logger.warning(f"Timeout ({wait_timeout}ms) waiting for element '{sel_event}' (visible/enabled) or during click: {e_timeout.__class__.__name__}") # Fall through to other fallbacks if timeout except Exception as e_click_attempt1: logger.warning(f"Standard Playwright click (attempt 1) for resolved locator from '{sel_event}' failed: {e_click_attempt1.__class__.__name__} ({str(e_click_attempt1)})") # Fallback to dispatchEvent if standard click failed (and not returned) if not self._clicked_with_selector: try: logger.info(f"Fallback 3 (Final): Attempting to dispatch click event for resolved locator from '{sel_event}'") logger.debug(f"[REPLAYER _apply CLICK] Fallback 3: Attempting dispatchEvent for '{sel_event}'") if loc.count() > 0: element_handle = loc.element_handle(timeout=1000) if element_handle: element_handle.dispatch_event('click') self._clicked_dispatch = True self._clicked_with_selector = True logger.info(f"DispatchEvent (via element_handle) click successful for '{sel_event}'") time.sleep(0.25) return else: loc.dispatch_event('click') self._clicked_dispatch = True self._clicked_with_selector = True logger.info(f"DispatchEvent (via locator) click successful for '{sel_event}'") time.sleep(0.25) return else: logger.error(f"Cannot dispatch click for '{sel_event}', resolved locator is empty.") except Exception as e_dispatch: logger.warning(f"DispatchEvent click failed for '{sel_event}': {e_dispatch.__class__.__name__} ({str(e_dispatch)}). Falling back to XY if available.") # Fallback to XY click if selector-based attempts failed or no selector if not self._clicked_with_selector: log_x, log_y = ev.get("x"), ev.get("y") if log_x is not None and log_y is not None: logger.info(f"Fallback: Performing coordinate-based click at ({log_x},{log_y})") self.page.mouse.click(log_x, log_y, button=self.BTN_MAP.get(btn, "left")) time.sleep(0.25) else: if sel_event: logger.error(f"All click attempts failed for selector '{sel_event}' and no XY coordinates available.") return if typ == "keyboard_input": key_to_press = ev["key"] modifiers_for_press = ev.get("modifiers", []) # REVERTED to 'modifiers' sel_for_press = ev.get("selector") logger.debug(f"APPLYING SINGLE KEY PRESS: '{key_to_press}' (mods: {modifiers_for_press}) -> {sel_for_press or 'no specific target'}") if sel_for_press: try: target_loc_key_press = self.page.locator(sel_for_press).first if target_loc_key_press.count() > 0: target_loc_key_press.focus(timeout=800) else: logger.warning(f"Target element for key press not found: {sel_for_press}") except Exception as e_focus_single_key: logger.debug(f"Focus failed for selector '{sel_for_press}' during single key press: {e_focus_single_key.__class__.__name__}") mapped_mods_press = [self.MOD_MAP[m] for m in modifiers_for_press if m in self.MOD_MAP] for m_down_key in mapped_mods_press: self.page.keyboard.down(m_down_key) try: self.page.keyboard.press(key_to_press) except Exception as e_press: logger.error(f"Error during page.keyboard.press('{key_to_press}'): {e_press.__class__.__name__} - {str(e_press)}") for m_up_key in reversed(mapped_mods_press): self.page.keyboard.up(m_up_key) logger.debug(f"✅ done SINGLE KEY PRESS: '{key_to_press}' -> {sel_for_press or 'no specific target'}") return # --- NEW EVENT HANDLERS --- elif typ == "clipboard_copy": logger.debug(f"[REPLAYER _apply] Executing clipboard_copy controller action.") self.controller.execute("Copy text to clipboard", text=ev["text"]) logger.info(f"📋 Executed Copy: text='{(ev['text'][:30] + '...') if len(ev['text']) > 30 else ev['text']}'") return elif typ == "clipboard_paste": logger.debug(f"[REPLAYER _apply] Executing clipboard_paste controller action for selector: {ev['selector']}.") self.controller.execute("Paste text from clipboard", selector=ev["selector"]) logger.info(f"📋 Executed Paste into selector='{ev['selector']}'") return elif typ == "file_upload": logger.debug(f"[REPLAYER _apply] Processing file_upload for selector: {ev['selector']}, file_name: {ev.get('file_name')}") file_path_to_upload = None trace_file_name = ev.get("file_name") if trace_file_name and self.user_provided_files: for user_file_path_str in self.user_provided_files: user_file_path = Path(user_file_path_str) if user_file_path.name == trace_file_name: if user_file_path.exists(): file_path_to_upload = str(user_file_path) logger.info(f"Using user-provided file for '{trace_file_name}': {file_path_to_upload}") break else: logger.warning(f"User-provided file '{user_file_path_str}' for '{trace_file_name}' does not exist.") if not file_path_to_upload: trace_event_file_path = ev.get("file_path") # This is the one from original recording (often empty) if trace_event_file_path: path_obj = Path(trace_event_file_path).expanduser() if path_obj.exists(): file_path_to_upload = str(path_obj) logger.info(f"Using file_path from trace for '{trace_file_name or 'unknown'}': {file_path_to_upload}") else: logger.warning(f"file_path '{trace_event_file_path}' from trace for '{trace_file_name or 'unknown'}' does not exist.") if not file_path_to_upload and trace_file_name: fallback_path = Path(f"~/Downloads/{trace_file_name}").expanduser() if fallback_path.exists(): file_path_to_upload = str(fallback_path) logger.info(f"Using fallback file for '{trace_file_name}': {file_path_to_upload}") else: logger.warning(f"Fallback file '{fallback_path}' for '{trace_file_name}' does not exist.") if file_path_to_upload: logger.debug(f"[REPLAYER _apply] Executing file_upload controller action with path: {file_path_to_upload}") self.controller.execute("Upload local file", selector=ev["selector"], file_path=file_path_to_upload) logger.info(f"📤 Executed Upload: file='{trace_file_name or Path(file_path_to_upload).name}' (path: '{file_path_to_upload}') to selector='{ev['selector']}'") else: logger.error(f"Could not determine a valid file path for upload event: {ev}. Skipping upload.") return elif typ == "file_download": # Pass all necessary info from the event to the controller logger.debug(f"[REPLAYER _apply] Executing file_download controller action for: {ev.get('suggested_filename')}") self.controller.execute( "Download remote file", url=ev.get("download_url"), # Original download URL (for info/logging) suggested_filename=ev.get("suggested_filename"), recorded_local_path=ev.get("recorded_local_path") # Path to file saved during recording # dest_dir will be handled by CustomController.execute with its default if not present here ) logger.info(f"💾 Replay: Executed 'Download remote file' action for: {ev.get('suggested_filename')}") return # --- END NEW EVENT HANDLERS --- logger.debug(f"✅ done {typ} (no specific apply action in this path or already handled by controller.execute)") logger.debug(f"[REPLAYER _apply] Action {typ} applied (end of _apply general path).") def _resolve_click_locator(self, sel: str) -> Optional[SyncLocator]: if not sel: return None # Initial locator based on the selector from the trace initial_loc: SyncLocator = self.page.locator(sel).first # Check if the initial locator itself is a button or has role="button" # Use a try-catch for evaluate as the element might not exist or be stale try: if initial_loc and initial_loc.count() > 0: # Ensure element exists before evaluation # Check if the element itself is a button or has role="button" is_button_or_has_role = initial_loc.evaluate( "el => el.tagName === 'BUTTON' || el.getAttribute('role') === 'button'" ) if is_button_or_has_role: logger.debug(f"_resolve_click_locator: Initial selector '{sel}' is already a button or has role='button'. Using it.") return initial_loc else: logger.debug(f"_resolve_click_locator: Initial selector '{sel}' did not yield any elements. Will try to find ancestor.") # If initial_loc.count() is 0, initial_loc might not be suitable for ancestor search directly, # but Playwright handles this by searching from the page if the locator is empty. # However, it's cleaner to ensure we have a starting point if we intend to find an ancestor *of something*. # For now, we will proceed, and if initial_loc is empty, the ancestor search becomes a page-wide search for a button. except Exception as e_eval_initial: logger.debug(f"_resolve_click_locator: Error evaluating initial selector '{sel}': {e_eval_initial}. Will try to find ancestor.") # If not, or if initial check failed, try to find an ancestor that is a button or has role="button" # This also covers cases where `sel` might point to an inner element of a button (e.g., a span). # The XPath searches for an ancestor OR self that is a button or has the role. # Using a more specific XPath to find the closest ancestor or self that is a button: # xpath=ancestor-or-self::button | ancestor-or-self::*[@role='button'] # Playwright's loc.locator("xpath=...") will find the first such element from the perspective of `loc`. # If initial_loc was empty, this effectively searches from page root. # Let's try a slightly different approach for finding the button: use Playwright's :nth-match with a broader internal selector. # This attempts to find the *actual element* matching 'sel', then looks upwards or at itself for a button. # This is more robust if 'sel' is very specific to an inner element. # Re-fetch the initial locator to ensure we are working from the element pointed to by `sel` # This is important if `sel` is like 'div > span' - we want the span, then find its button parent. # If initial_loc.count() was 0 above, this will still be an empty locator. element_loc = self.page.locator(sel).first if element_loc.count() > 0: # Try to find a button by looking at the element itself or its ancestors # This combines checking self and then ascending. # The XPath 'ancestor-or-self::button | ancestor-or-self::*[@role="button"]' correctly finds the button. # We then take the .first of these, as Playwright will return them in document order (ancestors first). # To get the *closest* (most specific) button, we might need to be careful. # However, Playwright's .locator on an existing locator usually chains correctly. # Let's try to find the *specific* element by `sel` and then chain to find its button ancestor or self. # This is more reliable than a broad page search if `sel` is specific. potential_button_loc = element_loc.locator("xpath=ancestor-or-self::button | ancestor-or-self::*[@role='button']").first if potential_button_loc.count() > 0: logger.debug(f"_resolve_click_locator: Found button/role=button for '{sel}' via ancestor-or-self. Using it.") return potential_button_loc else: logger.debug(f"_resolve_click_locator: No button ancestor found for specific element of '{sel}'. Falling back to initial locator if it exists.") return element_loc if element_loc and element_loc.count() > 0 else None else: # If the original selector `sel` finds nothing, try a page-wide search for a button that might contain the text from `sel` if `sel` was text-based # This part is tricky and heuristic. For now, if `sel` finds nothing, we return None. logger.debug(f"_resolve_click_locator: Initial selector '{sel}' found no elements. Cannot resolve to a button.") return None # ------------- verify ------------- def _verify_tweet_posted(self): try: self.page.wait_for_selector('[role=alert]:text("sent")', timeout=3000) logger.info("Tweet post verification successful: 'sent' toast found.") except Exception as e_toast: logger.error(f"Tweet post verification failed: 'sent' toast not found within timeout. Error: {e_toast.__class__.__name__}") def _verify(self, ev: Dict[str, Any]): typ = ev["type"] sel_from_event_verify = ev.get("selector") if typ == "navigation": if not TraceReplayerSync._url_eq(self.page.url, ev["to"]): current_event_expected_url = ev["url"] nav_target_url = ev["to"] actual_page_url = self.page.url if TraceReplayerSync._url_eq(actual_page_url, nav_target_url): logger.debug(f"Navigation URL verified: Expected target {nav_target_url}, Got {actual_page_url}") return logger.warning(f"Potential Navigation URL drift: Expected target {nav_target_url}, but current URL is {actual_page_url}. Original event recorded at {current_event_expected_url}") current_event_index = -1 try: current_event_index = self.trace.index(ev) except ValueError: logger.error("Critical: Could not find current navigation event in trace for drift recovery. Raising drift based on target mismatch.") raise Drift(f"URL drift for navigation: expected target {nav_target_url}, got {actual_page_url}", ev) if 0 <= current_event_index < len(self.trace) - 1: next_event = self.trace[current_event_index + 1] logger.debug(f"Drift check for navigation: Next event is type '{next_event.get('type')}', URL '{next_event.get('url')}', To '{next_event.get('to')}'") if next_event.get("type") == "navigation": next_event_nav_target_url = next_event.get("to") next_event_recorded_at_url = next_event.get("url") if next_event_nav_target_url and TraceReplayerSync._url_eq(actual_page_url, next_event_nav_target_url): logger.info(f"Drift recovery for navigation: Actual URL {actual_page_url} matches TARGET of NEXT navigation. Allowing.") return if next_event_recorded_at_url and TraceReplayerSync._url_eq(actual_page_url, next_event_recorded_at_url): logger.info(f"Drift recovery for navigation: Actual URL {actual_page_url} matches RECORDED URL of NEXT navigation. Allowing.") return logger.error(f"URL drift CONFIRMED for navigation: expected target {nav_target_url}, got {actual_page_url}") raise Drift(f"URL drift for navigation: expected target {nav_target_url}, got {actual_page_url}", ev) return if typ == "mouse_click" and self._clicked_with_selector and sel_from_event_verify: if "tweetButton" in sel_from_event_verify: self._verify_tweet_posted() return if getattr(self, "_clicked_dispatch", False): logger.info(f"Verification for selector '{sel_from_event_verify}': Skipped standard DOM check as dispatchEvent was used (element might be detached/changed).") return recorded_text = ev.get("text") if recorded_text is not None: try: verify_loc = self._resolve_click_locator(sel_from_event_verify) if verify_loc and verify_loc.count() > 0: current_text = (verify_loc.inner_text(timeout=1000)).strip() if current_text == recorded_text: logger.info(f"Inner text matched for {sel_from_event_verify}: '{recorded_text}'") else: logger.warning(f"Text drift for {sel_from_event_verify}: expected '{recorded_text}', got '{current_text}'") else: logger.warning(f"Cannot verify text for {sel_from_event_verify}, element not found by re-resolving after click.") except Exception as e_text_verify: logger.warning(f"Error during text verification for {sel_from_event_verify}: {str(e_text_verify)}") return if typ == "keyboard_input": try: active_element_focused = self.page.evaluate("document.activeElement !== null && document.activeElement !== document.body") if not active_element_focused: logger.debug("No specific element has focus after typing for event: %s", ev.get("selector")) except Exception as e: logger.debug("Error checking active element after typing: %s", e) return # Selector verification (if applicable) # This part remains unchanged from your existing logic if you have it. # For example, if a click was supposed to happen on a selector: if ev["type"] == "mouse_click" and ev.get("selector") and not self._clicked_with_selector: # This implies the fallback XY click was used, which can be a form of drift. # You might want to log this or handle it as a minor drift. logger.debug(f"Verification: Click for selector '{ev['selector']}' used XY fallback.") # URL drift check current_event_expected_url = ev["url"] actual_page_url = self.page.url if not TraceReplayerSync._url_eq(actual_page_url, current_event_expected_url): logger.warning(f"Potential URL drift: expected {current_event_expected_url} (from event record), got {actual_page_url} (actual browser URL).") current_event_index = -1 try: # Find the index of the current event 'ev' in self.trace # This is okay for moderately sized traces. Consider passing index if performance becomes an issue. current_event_index = self.trace.index(ev) except ValueError: logger.error("Critical: Could not find current event in trace for drift recovery. This shouldn't happen. Raising original drift.") raise Drift(f"URL drift (and event indexing error): expected {current_event_expected_url}, got {actual_page_url}", ev) if 0 <= current_event_index < len(self.trace) - 1: next_event = self.trace[current_event_index + 1] logger.debug(f"Drift check: Next event is type '{next_event.get('type')}', URL '{next_event.get('url')}', To '{next_event.get('to')}'") if next_event.get("type") == "navigation": next_event_target_url = next_event.get("to") next_event_recorded_at_url = next_event.get("url") # Condition 1: The browser is AT the target URL of the NEXT navigation event. # This means the current navigation (ev) effectively led to where next_event will go. if next_event_target_url and TraceReplayerSync._url_eq(actual_page_url, next_event_target_url): logger.info(f"Drift recovery: Actual URL {actual_page_url} matches TARGET ('to') of the NEXT navigation event. Allowing.") return # Condition 2: The browser is AT the URL where the NEXT navigation event was RECORDED. # This means the current navigation (ev) might have been part of a quick redirect chain, # and the page has landed on the 'url' from which the next_event was initiated. # This is relevant if next_event_target_url is different from next_event_recorded_at_url if next_event_recorded_at_url and TraceReplayerSync._url_eq(actual_page_url, next_event_recorded_at_url): logger.info(f"Drift recovery: Actual URL {actual_page_url} matches RECORDED URL ('url') of the NEXT navigation event. Allowing.") return # If no recovery condition met, raise the original drift error logger.error(f"URL drift CONFIRMED after checks: expected {current_event_expected_url} (from event record), got {actual_page_url} (actual browser URL).") raise Drift(f"URL drift: expected {current_event_expected_url}, got {actual_page_url}", ev) else: logger.debug(f"URL verified: Expected {current_event_expected_url}, Got {actual_page_url}") # ---------- util ---------- @staticmethod def _url_eq(a, b): if not a or not b: return False pa, pb = urlparse(a), urlparse(b) if pa.netloc.replace('www.','') != pb.netloc.replace('www.',''): return False if pa.path.rstrip('/') != pb.path.rstrip('/'): return False KEEP = {'q','tbm','hl'} qa = {k:v for k,v in parse_qs(pa.query).items() if k in KEEP} qb = {k:v for k,v in parse_qs(pb.query).items() if k in KEEP} return qa == qb # -------------------------------------------------- # CLI demo (optional) # -------------------------------------------------- async def _cli_demo(url: str, trace_path: str): from playwright.async_api import async_playwright # from src.controller.custom_controller import CustomController # Async controller print("[CLI_DEMO] WARNING: _cli_demo is not yet updated for TraceReplayerSync and CustomControllerSync. Skipping full replay test.", flush=True) # Temporarily disable the replayer part of the CLI demo until CustomControllerSync is ready logger.info(f"CLI Demo: Replaying trace '{trace_path}' starting at URL '{url}'") async with async_playwright() as pw: browser = await pw.chromium.launch(headless=False) # Usually headless=False for observing replay # Create a new context for each replay for isolation context = await browser.new_context() page = await context.new_page() # Navigate to the initial URL mentioned in the trace or a default start URL # The replayer itself handles navigation events from the trace. # So, `url` here is the very first URL to open before replaying starts. logger.info(f"CLI Demo: Initial navigation to {url}") try: await page.goto(url, wait_until="networkidle", timeout=15000) except Exception as e_goto: logger.warning(f"CLI Demo: Initial goto to {url} failed or timed out: {e_goto}. Attempting to continue replay.") # Instantiate your custom controller # controller = CustomController() # Load trace and instantiate replayer with the controller try: trace_events = load_trace(trace_path) if not trace_events: logger.error(f"CLI Demo: No events found in trace file: {trace_path}") await browser.close() return logger.info(f"CLI Demo: Loaded {len(trace_events)} events from {trace_path}") except Exception as e_load: logger.error(f"CLI Demo: Failed to load trace file {trace_path}: {e_load}") await browser.close() return # rep = TraceReplayerSync(page, trace_events, controller, user_provided_files=None) # Pass the controller # try: # rep.play(speed=1) # Adjust speed as needed (1.0 is real-time, higher is faster) # logger.info("✅ CLI Demo: Replay completed") # except Drift as d: # logger.error(f"⚠️ CLI Demo: Drift detected during replay: {d}") # if d.event: # logger.error(f"Drift occurred at event: {json.dumps(d.event, indent=2)}") # except Exception as e_play: # logger.error(f"💥 CLI Demo: An error occurred during replay: {e_play}", exc_info=True) # finally: # logger.info("CLI Demo: Closing browser...") # # Keep browser open for a few seconds to inspect final state, then close # # await asyncio.sleep(5) # Optional: delay before closing # await browser.close() # For now, just close the browser after setup print("[CLI_DEMO] Intentionally skipping replay part in CLI demo for now.", flush=True) await browser.close() if __name__ == "__main__": import sys, asyncio as _a # Ensure correct arguments are provided if len(sys.argv) < 3: print("Usage: python src/utils/replayer.py ") sys.exit(1) # Configure logging for the CLI demo if run directly logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') _a.run(_cli_demo(sys.argv[1], sys.argv[2]))