Spaces:
Sleeping
Sleeping
File size: 46,858 Bytes
94ff58a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 |
import asyncio, json, logging, time
from pathlib import Path
from typing import List, Dict, Any, Optional, Literal
from urllib.parse import urlparse, parse_qs
from playwright.sync_api import Page as SyncPage, TimeoutError as SyncPlaywrightTimeoutError, Locator as SyncLocator, ElementHandle as SyncElementHandle
logger = logging.getLogger(__name__)
# --------------------------------------------------
# Exceptions
# --------------------------------------------------
class Drift(Exception):
"""Raised when replay diverges from expected state."""
def __init__(self, msg: str, event: Dict[str, Any] | None = None):
super().__init__(msg)
self.event = event
# --------------------------------------------------
# Trace loader helper
# --------------------------------------------------
def load_trace(path: str | Path) -> List[Dict[str, Any]]:
return [json.loads(l) for l in Path(path).read_text().splitlines() if l.strip()]
# --------------------------------------------------
# Replayer
# --------------------------------------------------
class TraceReplayerSync:
BTN_MAP: Dict[str, Literal["left", "middle", "right"]] = {"left": "left", "middle": "middle", "right": "right"}
MOD_MAP = {"alt": "Alt", "ctrl": "Control", "shift": "Shift", "meta": "Meta"}
def __init__(self, page: SyncPage, trace: List[Dict[str, Any]], controller: Any,
user_provided_files: Optional[List[str]] = None,
ui_q: Optional[asyncio.Queue] = None,
main_loop: Optional[asyncio.AbstractEventLoop] = None):
logger.debug(f"[REPLAYER_SYNC __init__] Initializing. Page: {type(page)}, Trace_events: {len(trace) if trace else 0}, Controller: {type(controller)}")
self.page = page
self.trace = trace
self.controller = controller
self.user_provided_files = user_provided_files or [] # Store user-provided file paths
self._clicked_with_selector = False
self._clicked_dispatch = False
self.ui_q = ui_q
self.main_loop = main_loop
# ------------- main loop -------------
def play(self, speed: float = 2.0):
i = 0
logger.debug(f"[REPLAYER play] Starting play loop. Trace length: {len(self.trace)}")
while i < len(self.trace):
ev = self.trace[i]
logger.debug(f"[REPLAYER play] Processing event {i+1}/{len(self.trace)}: Type: {ev.get('type')}, URL: {ev.get('url')}")
# Avoid processing 'v' key press before clipboard_paste event
# Check if the current event is 'v' key input and the next is 'clipboard_paste'
if ev.get("type") == "keyboard_input" and ev.get("key") == "v" and not ev.get("modifiers"):
if (i + 1) < len(self.trace):
next_ev = self.trace[i+1]
if next_ev.get("type") == "clipboard_paste":
logger.info(f"[REPLAYER play] Skipping 'v' key press before clipboard_paste. Event {i+1}")
i += 1 # Skip the 'v' key press event
ev = next_ev # Process the clipboard_paste event in this iteration
logger.debug(f"[REPLAYER play] Now processing event {i+1}/{len(self.trace)}: Type: {ev.get('type')}, URL: {ev.get('url')}")
# New concise and iconic log format
log_type = ev["type"]
current_event_url = ev.get("url", "N/A") # URL from the event itself
log_message_elements = []
if log_type == "mouse_click":
log_message_elements.append("π±οΈ MouseClick")
button_text = ev.get("text")
selector = ev.get("selector")
if button_text:
log_message_elements.append(f"button_text:\"{button_text}\"")
elif selector:
log_message_elements.append(f"selector:\"{selector}\"")
else:
log_message_elements.append(f"xy:({ev.get('x', 'N/A')},{ev.get('y', 'N/A')})")
button_type = ev.get("button", "left")
if button_type != "left": # Only show if not default left click
log_message_elements.append(f"button:\"{button_type}\"")
log_message_elements.append(f"url='{current_event_url}'")
elif log_type == "keyboard_input":
log_message_elements.append("β¨οΈ KeyInput")
key_val = ev.get("key")
log_message_elements.append(f"key:'{key_val}'")
modifiers = ev.get("modifiers")
if modifiers:
log_message_elements.append(f"mods:{modifiers}")
log_message_elements.append(f"url='{current_event_url}'")
elif log_type == "navigation":
log_message_elements.append("π Navigation")
to_url = ev.get("to")
log_message_elements.append(f"to='{to_url}'")
else: # Generic fallback for other event types like scroll, viewport_change etc.
log_message_elements.append(f"{log_type.replace('_', ' ').title()}")
s = ev.get("selector")
if s: log_message_elements.append(f"selector:\"{s}\"")
if 'x' in ev and 'y' in ev:
log_message_elements.append(f"coords:({ev.get('x')},{ev.get('y')})")
log_message_elements.append(f"url='{current_event_url}'")
# Send iconic log message to UI queue
log_msg_str = ", ".join(log_message_elements)
if self.ui_q and self.main_loop:
self.main_loop.call_soon_threadsafe(self.ui_q.put_nowait, log_msg_str)
else: # Fallback to standard logger if queue/loop not provided (e.g. during testing)
logger.info(log_msg_str)
# Delay logic
logger.debug(f"[REPLAYER play] Event {i+1}: Applying delay of {ev.get('t', 0)}ms, speed adjusted: {ev.get('t', 0)/speed}ms")
event_delay_ms = ev.get("t", 0)
if event_delay_ms > 10: # Log only if delay is > 10ms (to avoid spamming for 0ms delays)
logger.debug(f"Pausing for {event_delay_ms/1000.0:.3f}s (speed adjusted: {event_delay_ms/1000.0/speed:.3f}s)")
time.sleep(event_delay_ms / 1000.0 / speed)
if ev["type"] == "keyboard_input":
consumed = self._batch_type(i)
i += consumed
logger.debug(f"[REPLAYER play] Event {i+1-consumed} (keyboard_input batch): Consumed {consumed} events. New index: {i}")
continue
self._apply(ev)
logger.debug(f"[REPLAYER play] Event {i+1}: _apply(ev) completed.")
self._verify(ev)
logger.debug(f"[REPLAYER play] Event {i+1}: _verify(ev) completed.")
i += 1
logger.debug(f"[REPLAYER play] Play loop finished.")
# ------------- batching -------------
def _batch_type(self, idx: int) -> int:
ev_start_batch = self.trace[idx]
sel, mods = ev_start_batch.get("selector"), ev_start_batch.get("modifiers", [])
text_to_type = ""
current_idx_in_trace = idx
first_key = ev_start_batch.get("key", "")
is_first_key_batchable = len(first_key) == 1 and not mods
if is_first_key_batchable:
text_to_type = first_key
current_idx_in_trace = idx + 1
while current_idx_in_trace < len(self.trace):
nxt = self.trace[current_idx_in_trace]
if nxt["type"] != "keyboard_input" or nxt.get("t",1) != 0: break
if nxt.get("selector") != sel: break
if nxt.get("modifiers"): break
next_key_char = nxt.get("key", "")
if len(next_key_char) == 1:
text_to_type += next_key_char
current_idx_in_trace += 1
else:
break
current_idx_in_trace -= 1
num_events_processed = 0
if len(text_to_type) > 1:
self._apply_type(sel, text_to_type, [], ev_start_batch)
self._verify(ev_start_batch)
num_events_processed = current_idx_in_trace - idx + 1
else:
self._apply(ev_start_batch)
self._verify(ev_start_batch)
num_events_processed = 1
return num_events_processed
def _apply_type(self, sel: Optional[str], text: str, mods: List[str], original_event_for_log: Dict[str, Any]):
log_sel_for_type = sel or "N/A"
logger.debug(f"APPLYING BATCH TYPE: '{text}' -> {log_sel_for_type}")
if sel:
try:
element_to_fill = self.page.locator(sel).first
element_to_fill.wait_for(state='visible', timeout=5000)
element_to_fill.focus(timeout=1000)
time.sleep(0.2) # Short delay after focus before filling
element_to_fill.fill(text)
except Exception as e_fill:
logger.error(f"Error during locator.fill('{text}') for selector '{sel}': {e_fill.__class__.__name__} - {str(e_fill)}. Falling back to keyboard.type.")
# Fallback to original keyboard.type if fill fails for some reason
mapped_mods = [self.MOD_MAP[m] for m in mods if m in self.MOD_MAP]
for m_down in mapped_mods: self.page.keyboard.down(m_down)
try:
self.page.keyboard.type(text)
except Exception as e_type:
logger.error(f"Error during fallback page.keyboard.type('{text}'): {e_type.__class__.__name__} - {str(e_type)}")
for m_up in reversed(mapped_mods): self.page.keyboard.up(m_up)
else:
# If no selector, fallback to general keyboard typing (less common for batched text)
logger.warning(f"Attempting to batch type '{text}' without a selector. Using page.keyboard.type().")
mapped_mods = [self.MOD_MAP[m] for m in mods if m in self.MOD_MAP]
for m_down in mapped_mods: self.page.keyboard.down(m_down)
try:
self.page.keyboard.type(text)
except Exception as e_type:
logger.error(f"Error during page.keyboard.type('{text}') without selector: {e_type.__class__.__name__} - {str(e_type)}")
for m_up in reversed(mapped_mods): self.page.keyboard.up(m_up)
logger.debug(f"β
done BATCH TYPE: '{text}' -> {log_sel_for_type}")
# ------------- apply -------------
def _apply(self, ev: Dict[str, Any]):
typ = ev["type"]
sel_event = ev.get("selector")
logger.debug(f"[REPLAYER _apply] Applying action: {typ}, selector: {sel_event}, keys: {ev.get('key')}, to: {ev.get('to')}")
logger.debug(f"APPLYING ACTION: {typ} for sel={sel_event or 'N/A'}, key={ev.get('key','N/A')}")
if typ == "navigation":
target = ev["to"]
if not self._url_eq(self.page.url, target):
logger.debug(f"[REPLAYER _apply NAV] Attempting self.page.goto('{target}')")
try:
# Restore original navigation target and timeout
self.page.goto(target, wait_until="domcontentloaded", timeout=15000)
logger.debug(f"[REPLAYER _apply NAV] self.page.goto to '{target}' SUCCEEDED.")
except SyncPlaywrightTimeoutError as pte_goto:
logger.error(f"[REPLAYER _apply NAV] PlaywrightTimeoutError during goto '{target}': {pte_goto}", exc_info=True)
except Exception as e_goto_general:
logger.error(f"[REPLAYER _apply NAV] Exception during goto '{target}': {e_goto_general}", exc_info=True)
else:
logger.debug(f"[REPLAYER _apply NAV] Page URL {self.page.url} already matches target {target}. Skipping goto.")
logger.debug(f"[REPLAYER _apply NAV] Attempting page.bring_to_front() for {target}")
self.page.bring_to_front()
logger.debug(f"[REPLAYER _apply NAV] page.bring_to_front() completed for {target}")
# Enhanced wait after navigation
try:
logger.debug(f"Waiting for 'load' state after navigating to {target}")
logger.debug(f"[REPLAYER _apply NAV] Attempting page.wait_for_load_state('load') for {target}")
self.page.wait_for_load_state('load', timeout=10000) # Wait for basic load
logger.debug(f"[REPLAYER _apply NAV] page.wait_for_load_state('load') completed for {target}")
logger.debug(f"'load' state confirmed for {target}. Now waiting for networkidle.")
logger.debug(f"[REPLAYER _apply NAV] Attempting page.wait_for_load_state('networkidle') for {target}")
self.page.wait_for_load_state('networkidle', timeout=3000) # Shorter networkidle (e.g., 3 seconds)
logger.debug(f"[REPLAYER _apply NAV] page.wait_for_load_state('networkidle') completed for {target}")
logger.debug(f"[REPLAYER _apply NAV] Attempting time.sleep(0.3) for {target}")
time.sleep(0.3) # Small buffer
logger.debug(f"[REPLAYER _apply NAV] time.sleep(0.3) completed for {target}")
logger.debug(f"Network idle (or timeout) confirmed for {target}")
except Exception as e_wait:
logger.warning(f"Timeout or error during page load/networkidle wait on {target}: {e_wait.__class__.__name__} - {str(e_wait)}")
logger.info(f"β
π Navigated: {target}")
logger.debug(f"[REPLAYER _apply] Action {typ} applied.")
return
if typ == "mouse_click":
btn = ev.get("button", "left")
recorded_text = ev.get("text", "").lower() if ev.get("text") else ""
self._clicked_with_selector = False
self._clicked_dispatch = False
if sel_event:
loc = self._resolve_click_locator(sel_event)
if loc:
try:
logger.debug(f"Attempting to click resolved locator for original selector: {sel_event}")
# Default explicit wait timeout
wait_timeout = 5000
# Expanded keyword list
critical_keywords = [
"download", "save", "submit", "next", "continue", "confirm", "upload", "add", "create",
"process", "generate", "apply", "send", "post", "tweet", "run", "execute",
"search", "go", "login", "signup", "pay", "checkout", "agree", "accept", "allow"
]
sel_event_lower = sel_event.lower() if sel_event else ""
is_critical_action = False
if any(keyword in recorded_text for keyword in critical_keywords):
is_critical_action = True
elif sel_event_lower and any(keyword in sel_event_lower for keyword in critical_keywords):
is_critical_action = True
# Specific checks for known critical element identifiers
if sel_event_lower and (
'data-testid="send-button"' in sel_event_lower or
'data-testid*="submit"' in sel_event_lower or
'data-testid*="send"' in sel_event_lower or
'id*="submit-button"' in sel_event_lower or
'data-testid*="tweetbutton"' in sel_event_lower or
'id*="composer-submit-button"' in sel_event_lower # for chatgpt (example)
):
is_critical_action = True
if is_critical_action:
# Use original recorded text for logging if available, else empty string
log_text = ev.get('text', '')
logger.info(f"Critical action suspected (text: '{log_text}', selector: '{sel_event}'). Extending wait.")
wait_timeout = 15000 # 15 seconds
logger.debug(f"Waiting for selector '{sel_event}' to be visible and enabled with timeout {wait_timeout}ms.")
loc.wait_for(state='visible', timeout=wait_timeout)
loc.scroll_into_view_if_needed(timeout=wait_timeout)
logger.debug(f"Element '{sel_event}' is visible and enabled. Attempting standard click.")
print(f"[REPLAYER _apply CLICK] >>> Attempting loc.click() for '{sel_event}' with timeout {wait_timeout}ms", flush=True)
try:
loc.click(button=self.BTN_MAP.get(btn, "left"), timeout=wait_timeout, delay=100)
self._clicked_with_selector = True
logger.debug(f"[REPLAYER _apply CLICK] loc.click() for '{sel_event}' SUCCEEDED.")
logger.info(f"Standard Playwright click successful for resolved locator from selector: {sel_event}")
time.sleep(0.25) # Keep small delay after successful click
return # Successfully clicked
except SyncPlaywrightTimeoutError as pte_click:
logger.warning(f"[REPLAYER _apply CLICK] PlaywrightTimeoutError during standard loc.click() for '{sel_event}': {pte_click}")
except Exception as e_click:
logger.warning(f"[REPLAYER _apply CLICK] Exception during standard loc.click() for '{sel_event}': {e_click}", exc_info=True)
# Fallback 2: Try click with force=True
if not self._clicked_with_selector:
logger.debug(f"[REPLAYER _apply CLICK] Fallback 2: Attempting loc.click(force=True) for '{sel_event}'")
try:
loc.click(button=self.BTN_MAP.get(btn, "left"), timeout=wait_timeout, delay=100, force=True)
self._clicked_with_selector = True
logger.info(f"Forced Playwright click successful for '{sel_event}'")
time.sleep(0.25)
return
except SyncPlaywrightTimeoutError as pte_force_click:
logger.warning(f"[REPLAYER _apply CLICK] PlaywrightTimeoutError during loc.click(force=True) for '{sel_event}': {pte_force_click}")
except Exception as e_force_click:
logger.warning(f"[REPLAYER _apply CLICK] Exception during loc.click(force=True) for '{sel_event}': {e_force_click}", exc_info=True)
except SyncPlaywrightTimeoutError as e_timeout:
logger.warning(f"Timeout ({wait_timeout}ms) waiting for element '{sel_event}' (visible/enabled) or during click: {e_timeout.__class__.__name__}")
# Fall through to other fallbacks if timeout
except Exception as e_click_attempt1:
logger.warning(f"Standard Playwright click (attempt 1) for resolved locator from '{sel_event}' failed: {e_click_attempt1.__class__.__name__} ({str(e_click_attempt1)})")
# Fallback to dispatchEvent if standard click failed (and not returned)
if not self._clicked_with_selector:
try:
logger.info(f"Fallback 3 (Final): Attempting to dispatch click event for resolved locator from '{sel_event}'")
logger.debug(f"[REPLAYER _apply CLICK] Fallback 3: Attempting dispatchEvent for '{sel_event}'")
if loc.count() > 0:
element_handle = loc.element_handle(timeout=1000)
if element_handle:
element_handle.dispatch_event('click')
self._clicked_dispatch = True
self._clicked_with_selector = True
logger.info(f"DispatchEvent (via element_handle) click successful for '{sel_event}'")
time.sleep(0.25)
return
else:
loc.dispatch_event('click')
self._clicked_dispatch = True
self._clicked_with_selector = True
logger.info(f"DispatchEvent (via locator) click successful for '{sel_event}'")
time.sleep(0.25)
return
else:
logger.error(f"Cannot dispatch click for '{sel_event}', resolved locator is empty.")
except Exception as e_dispatch:
logger.warning(f"DispatchEvent click failed for '{sel_event}': {e_dispatch.__class__.__name__} ({str(e_dispatch)}). Falling back to XY if available.")
# Fallback to XY click if selector-based attempts failed or no selector
if not self._clicked_with_selector:
log_x, log_y = ev.get("x"), ev.get("y")
if log_x is not None and log_y is not None:
logger.info(f"Fallback: Performing coordinate-based click at ({log_x},{log_y})")
self.page.mouse.click(log_x, log_y, button=self.BTN_MAP.get(btn, "left"))
time.sleep(0.25)
else:
if sel_event:
logger.error(f"All click attempts failed for selector '{sel_event}' and no XY coordinates available.")
return
if typ == "keyboard_input":
key_to_press = ev["key"]
modifiers_for_press = ev.get("modifiers", []) # REVERTED to 'modifiers'
sel_for_press = ev.get("selector")
logger.debug(f"APPLYING SINGLE KEY PRESS: '{key_to_press}' (mods: {modifiers_for_press}) -> {sel_for_press or 'no specific target'}")
if sel_for_press:
try:
target_loc_key_press = self.page.locator(sel_for_press).first
if target_loc_key_press.count() > 0:
target_loc_key_press.focus(timeout=800)
else:
logger.warning(f"Target element for key press not found: {sel_for_press}")
except Exception as e_focus_single_key:
logger.debug(f"Focus failed for selector '{sel_for_press}' during single key press: {e_focus_single_key.__class__.__name__}")
mapped_mods_press = [self.MOD_MAP[m] for m in modifiers_for_press if m in self.MOD_MAP]
for m_down_key in mapped_mods_press: self.page.keyboard.down(m_down_key)
try:
self.page.keyboard.press(key_to_press)
except Exception as e_press:
logger.error(f"Error during page.keyboard.press('{key_to_press}'): {e_press.__class__.__name__} - {str(e_press)}")
for m_up_key in reversed(mapped_mods_press): self.page.keyboard.up(m_up_key)
logger.debug(f"β
done SINGLE KEY PRESS: '{key_to_press}' -> {sel_for_press or 'no specific target'}")
return
# --- NEW EVENT HANDLERS ---
elif typ == "clipboard_copy":
logger.debug(f"[REPLAYER _apply] Executing clipboard_copy controller action.")
self.controller.execute("Copy text to clipboard", text=ev["text"])
logger.info(f"π Executed Copy: text='{(ev['text'][:30] + '...') if len(ev['text']) > 30 else ev['text']}'")
return
elif typ == "clipboard_paste":
logger.debug(f"[REPLAYER _apply] Executing clipboard_paste controller action for selector: {ev['selector']}.")
self.controller.execute("Paste text from clipboard", selector=ev["selector"])
logger.info(f"π Executed Paste into selector='{ev['selector']}'")
return
elif typ == "file_upload":
logger.debug(f"[REPLAYER _apply] Processing file_upload for selector: {ev['selector']}, file_name: {ev.get('file_name')}")
file_path_to_upload = None
trace_file_name = ev.get("file_name")
if trace_file_name and self.user_provided_files:
for user_file_path_str in self.user_provided_files:
user_file_path = Path(user_file_path_str)
if user_file_path.name == trace_file_name:
if user_file_path.exists():
file_path_to_upload = str(user_file_path)
logger.info(f"Using user-provided file for '{trace_file_name}': {file_path_to_upload}")
break
else:
logger.warning(f"User-provided file '{user_file_path_str}' for '{trace_file_name}' does not exist.")
if not file_path_to_upload:
trace_event_file_path = ev.get("file_path") # This is the one from original recording (often empty)
if trace_event_file_path:
path_obj = Path(trace_event_file_path).expanduser()
if path_obj.exists():
file_path_to_upload = str(path_obj)
logger.info(f"Using file_path from trace for '{trace_file_name or 'unknown'}': {file_path_to_upload}")
else:
logger.warning(f"file_path '{trace_event_file_path}' from trace for '{trace_file_name or 'unknown'}' does not exist.")
if not file_path_to_upload and trace_file_name:
fallback_path = Path(f"~/Downloads/{trace_file_name}").expanduser()
if fallback_path.exists():
file_path_to_upload = str(fallback_path)
logger.info(f"Using fallback file for '{trace_file_name}': {file_path_to_upload}")
else:
logger.warning(f"Fallback file '{fallback_path}' for '{trace_file_name}' does not exist.")
if file_path_to_upload:
logger.debug(f"[REPLAYER _apply] Executing file_upload controller action with path: {file_path_to_upload}")
self.controller.execute("Upload local file",
selector=ev["selector"],
file_path=file_path_to_upload)
logger.info(f"π€ Executed Upload: file='{trace_file_name or Path(file_path_to_upload).name}' (path: '{file_path_to_upload}') to selector='{ev['selector']}'")
else:
logger.error(f"Could not determine a valid file path for upload event: {ev}. Skipping upload.")
return
elif typ == "file_download":
# Pass all necessary info from the event to the controller
logger.debug(f"[REPLAYER _apply] Executing file_download controller action for: {ev.get('suggested_filename')}")
self.controller.execute(
"Download remote file",
url=ev.get("download_url"), # Original download URL (for info/logging)
suggested_filename=ev.get("suggested_filename"),
recorded_local_path=ev.get("recorded_local_path") # Path to file saved during recording
# dest_dir will be handled by CustomController.execute with its default if not present here
)
logger.info(f"πΎ Replay: Executed 'Download remote file' action for: {ev.get('suggested_filename')}")
return
# --- END NEW EVENT HANDLERS ---
logger.debug(f"β
done {typ} (no specific apply action in this path or already handled by controller.execute)")
logger.debug(f"[REPLAYER _apply] Action {typ} applied (end of _apply general path).")
def _resolve_click_locator(self, sel: str) -> Optional[SyncLocator]:
if not sel: return None
# Initial locator based on the selector from the trace
initial_loc: SyncLocator = self.page.locator(sel).first
# Check if the initial locator itself is a button or has role="button"
# Use a try-catch for evaluate as the element might not exist or be stale
try:
if initial_loc and initial_loc.count() > 0: # Ensure element exists before evaluation
# Check if the element itself is a button or has role="button"
is_button_or_has_role = initial_loc.evaluate(
"el => el.tagName === 'BUTTON' || el.getAttribute('role') === 'button'"
)
if is_button_or_has_role:
logger.debug(f"_resolve_click_locator: Initial selector '{sel}' is already a button or has role='button'. Using it.")
return initial_loc
else:
logger.debug(f"_resolve_click_locator: Initial selector '{sel}' did not yield any elements. Will try to find ancestor.")
# If initial_loc.count() is 0, initial_loc might not be suitable for ancestor search directly,
# but Playwright handles this by searching from the page if the locator is empty.
# However, it's cleaner to ensure we have a starting point if we intend to find an ancestor *of something*.
# For now, we will proceed, and if initial_loc is empty, the ancestor search becomes a page-wide search for a button.
except Exception as e_eval_initial:
logger.debug(f"_resolve_click_locator: Error evaluating initial selector '{sel}': {e_eval_initial}. Will try to find ancestor.")
# If not, or if initial check failed, try to find an ancestor that is a button or has role="button"
# This also covers cases where `sel` might point to an inner element of a button (e.g., a span).
# The XPath searches for an ancestor OR self that is a button or has the role.
# Using a more specific XPath to find the closest ancestor or self that is a button:
# xpath=ancestor-or-self::button | ancestor-or-self::*[@role='button']
# Playwright's loc.locator("xpath=...") will find the first such element from the perspective of `loc`.
# If initial_loc was empty, this effectively searches from page root.
# Let's try a slightly different approach for finding the button: use Playwright's :nth-match with a broader internal selector.
# This attempts to find the *actual element* matching 'sel', then looks upwards or at itself for a button.
# This is more robust if 'sel' is very specific to an inner element.
# Re-fetch the initial locator to ensure we are working from the element pointed to by `sel`
# This is important if `sel` is like 'div > span' - we want the span, then find its button parent.
# If initial_loc.count() was 0 above, this will still be an empty locator.
element_loc = self.page.locator(sel).first
if element_loc.count() > 0:
# Try to find a button by looking at the element itself or its ancestors
# This combines checking self and then ascending.
# The XPath 'ancestor-or-self::button | ancestor-or-self::*[@role="button"]' correctly finds the button.
# We then take the .first of these, as Playwright will return them in document order (ancestors first).
# To get the *closest* (most specific) button, we might need to be careful.
# However, Playwright's .locator on an existing locator usually chains correctly.
# Let's try to find the *specific* element by `sel` and then chain to find its button ancestor or self.
# This is more reliable than a broad page search if `sel` is specific.
potential_button_loc = element_loc.locator("xpath=ancestor-or-self::button | ancestor-or-self::*[@role='button']").first
if potential_button_loc.count() > 0:
logger.debug(f"_resolve_click_locator: Found button/role=button for '{sel}' via ancestor-or-self. Using it.")
return potential_button_loc
else:
logger.debug(f"_resolve_click_locator: No button ancestor found for specific element of '{sel}'. Falling back to initial locator if it exists.")
return element_loc if element_loc and element_loc.count() > 0 else None
else:
# If the original selector `sel` finds nothing, try a page-wide search for a button that might contain the text from `sel` if `sel` was text-based
# This part is tricky and heuristic. For now, if `sel` finds nothing, we return None.
logger.debug(f"_resolve_click_locator: Initial selector '{sel}' found no elements. Cannot resolve to a button.")
return None
# ------------- verify -------------
def _verify_tweet_posted(self):
try:
self.page.wait_for_selector('[role=alert]:text("sent")', timeout=3000)
logger.info("Tweet post verification successful: 'sent' toast found.")
except Exception as e_toast:
logger.error(f"Tweet post verification failed: 'sent' toast not found within timeout. Error: {e_toast.__class__.__name__}")
def _verify(self, ev: Dict[str, Any]):
typ = ev["type"]
sel_from_event_verify = ev.get("selector")
if typ == "navigation":
if not TraceReplayerSync._url_eq(self.page.url, ev["to"]):
current_event_expected_url = ev["url"]
nav_target_url = ev["to"]
actual_page_url = self.page.url
if TraceReplayerSync._url_eq(actual_page_url, nav_target_url):
logger.debug(f"Navigation URL verified: Expected target {nav_target_url}, Got {actual_page_url}")
return
logger.warning(f"Potential Navigation URL drift: Expected target {nav_target_url}, but current URL is {actual_page_url}. Original event recorded at {current_event_expected_url}")
current_event_index = -1
try:
current_event_index = self.trace.index(ev)
except ValueError:
logger.error("Critical: Could not find current navigation event in trace for drift recovery. Raising drift based on target mismatch.")
raise Drift(f"URL drift for navigation: expected target {nav_target_url}, got {actual_page_url}", ev)
if 0 <= current_event_index < len(self.trace) - 1:
next_event = self.trace[current_event_index + 1]
logger.debug(f"Drift check for navigation: Next event is type '{next_event.get('type')}', URL '{next_event.get('url')}', To '{next_event.get('to')}'")
if next_event.get("type") == "navigation":
next_event_nav_target_url = next_event.get("to")
next_event_recorded_at_url = next_event.get("url")
if next_event_nav_target_url and TraceReplayerSync._url_eq(actual_page_url, next_event_nav_target_url):
logger.info(f"Drift recovery for navigation: Actual URL {actual_page_url} matches TARGET of NEXT navigation. Allowing.")
return
if next_event_recorded_at_url and TraceReplayerSync._url_eq(actual_page_url, next_event_recorded_at_url):
logger.info(f"Drift recovery for navigation: Actual URL {actual_page_url} matches RECORDED URL of NEXT navigation. Allowing.")
return
logger.error(f"URL drift CONFIRMED for navigation: expected target {nav_target_url}, got {actual_page_url}")
raise Drift(f"URL drift for navigation: expected target {nav_target_url}, got {actual_page_url}", ev)
return
if typ == "mouse_click" and self._clicked_with_selector and sel_from_event_verify:
if "tweetButton" in sel_from_event_verify:
self._verify_tweet_posted()
return
if getattr(self, "_clicked_dispatch", False):
logger.info(f"Verification for selector '{sel_from_event_verify}': Skipped standard DOM check as dispatchEvent was used (element might be detached/changed).")
return
recorded_text = ev.get("text")
if recorded_text is not None:
try:
verify_loc = self._resolve_click_locator(sel_from_event_verify)
if verify_loc and verify_loc.count() > 0:
current_text = (verify_loc.inner_text(timeout=1000)).strip()
if current_text == recorded_text:
logger.info(f"Inner text matched for {sel_from_event_verify}: '{recorded_text}'")
else:
logger.warning(f"Text drift for {sel_from_event_verify}: expected '{recorded_text}', got '{current_text}'")
else:
logger.warning(f"Cannot verify text for {sel_from_event_verify}, element not found by re-resolving after click.")
except Exception as e_text_verify:
logger.warning(f"Error during text verification for {sel_from_event_verify}: {str(e_text_verify)}")
return
if typ == "keyboard_input":
try:
active_element_focused = self.page.evaluate("document.activeElement !== null && document.activeElement !== document.body")
if not active_element_focused:
logger.debug("No specific element has focus after typing for event: %s", ev.get("selector"))
except Exception as e:
logger.debug("Error checking active element after typing: %s", e)
return
# Selector verification (if applicable)
# This part remains unchanged from your existing logic if you have it.
# For example, if a click was supposed to happen on a selector:
if ev["type"] == "mouse_click" and ev.get("selector") and not self._clicked_with_selector:
# This implies the fallback XY click was used, which can be a form of drift.
# You might want to log this or handle it as a minor drift.
logger.debug(f"Verification: Click for selector '{ev['selector']}' used XY fallback.")
# URL drift check
current_event_expected_url = ev["url"]
actual_page_url = self.page.url
if not TraceReplayerSync._url_eq(actual_page_url, current_event_expected_url):
logger.warning(f"Potential URL drift: expected {current_event_expected_url} (from event record), got {actual_page_url} (actual browser URL).")
current_event_index = -1
try:
# Find the index of the current event 'ev' in self.trace
# This is okay for moderately sized traces. Consider passing index if performance becomes an issue.
current_event_index = self.trace.index(ev)
except ValueError:
logger.error("Critical: Could not find current event in trace for drift recovery. This shouldn't happen. Raising original drift.")
raise Drift(f"URL drift (and event indexing error): expected {current_event_expected_url}, got {actual_page_url}", ev)
if 0 <= current_event_index < len(self.trace) - 1:
next_event = self.trace[current_event_index + 1]
logger.debug(f"Drift check: Next event is type '{next_event.get('type')}', URL '{next_event.get('url')}', To '{next_event.get('to')}'")
if next_event.get("type") == "navigation":
next_event_target_url = next_event.get("to")
next_event_recorded_at_url = next_event.get("url")
# Condition 1: The browser is AT the target URL of the NEXT navigation event.
# This means the current navigation (ev) effectively led to where next_event will go.
if next_event_target_url and TraceReplayerSync._url_eq(actual_page_url, next_event_target_url):
logger.info(f"Drift recovery: Actual URL {actual_page_url} matches TARGET ('to') of the NEXT navigation event. Allowing.")
return
# Condition 2: The browser is AT the URL where the NEXT navigation event was RECORDED.
# This means the current navigation (ev) might have been part of a quick redirect chain,
# and the page has landed on the 'url' from which the next_event was initiated.
# This is relevant if next_event_target_url is different from next_event_recorded_at_url
if next_event_recorded_at_url and TraceReplayerSync._url_eq(actual_page_url, next_event_recorded_at_url):
logger.info(f"Drift recovery: Actual URL {actual_page_url} matches RECORDED URL ('url') of the NEXT navigation event. Allowing.")
return
# If no recovery condition met, raise the original drift error
logger.error(f"URL drift CONFIRMED after checks: expected {current_event_expected_url} (from event record), got {actual_page_url} (actual browser URL).")
raise Drift(f"URL drift: expected {current_event_expected_url}, got {actual_page_url}", ev)
else:
logger.debug(f"URL verified: Expected {current_event_expected_url}, Got {actual_page_url}")
# ---------- util ----------
@staticmethod
def _url_eq(a, b):
if not a or not b: return False
pa, pb = urlparse(a), urlparse(b)
if pa.netloc.replace('www.','') != pb.netloc.replace('www.',''): return False
if pa.path.rstrip('/') != pb.path.rstrip('/'): return False
KEEP = {'q','tbm','hl'}
qa = {k:v for k,v in parse_qs(pa.query).items() if k in KEEP}
qb = {k:v for k,v in parse_qs(pb.query).items() if k in KEEP}
return qa == qb
# --------------------------------------------------
# CLI demo (optional)
# --------------------------------------------------
async def _cli_demo(url: str, trace_path: str):
from playwright.async_api import async_playwright
# from src.controller.custom_controller import CustomController # Async controller
print("[CLI_DEMO] WARNING: _cli_demo is not yet updated for TraceReplayerSync and CustomControllerSync. Skipping full replay test.", flush=True)
# Temporarily disable the replayer part of the CLI demo until CustomControllerSync is ready
logger.info(f"CLI Demo: Replaying trace '{trace_path}' starting at URL '{url}'")
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=False) # Usually headless=False for observing replay
# Create a new context for each replay for isolation
context = await browser.new_context()
page = await context.new_page()
# Navigate to the initial URL mentioned in the trace or a default start URL
# The replayer itself handles navigation events from the trace.
# So, `url` here is the very first URL to open before replaying starts.
logger.info(f"CLI Demo: Initial navigation to {url}")
try:
await page.goto(url, wait_until="networkidle", timeout=15000)
except Exception as e_goto:
logger.warning(f"CLI Demo: Initial goto to {url} failed or timed out: {e_goto}. Attempting to continue replay.")
# Instantiate your custom controller
# controller = CustomController()
# Load trace and instantiate replayer with the controller
try:
trace_events = load_trace(trace_path)
if not trace_events:
logger.error(f"CLI Demo: No events found in trace file: {trace_path}")
await browser.close()
return
logger.info(f"CLI Demo: Loaded {len(trace_events)} events from {trace_path}")
except Exception as e_load:
logger.error(f"CLI Demo: Failed to load trace file {trace_path}: {e_load}")
await browser.close()
return
# rep = TraceReplayerSync(page, trace_events, controller, user_provided_files=None) # Pass the controller
# try:
# rep.play(speed=1) # Adjust speed as needed (1.0 is real-time, higher is faster)
# logger.info("β
CLI Demo: Replay completed")
# except Drift as d:
# logger.error(f"β οΈ CLI Demo: Drift detected during replay: {d}")
# if d.event:
# logger.error(f"Drift occurred at event: {json.dumps(d.event, indent=2)}")
# except Exception as e_play:
# logger.error(f"π₯ CLI Demo: An error occurred during replay: {e_play}", exc_info=True)
# finally:
# logger.info("CLI Demo: Closing browser...")
# # Keep browser open for a few seconds to inspect final state, then close
# # await asyncio.sleep(5) # Optional: delay before closing
# await browser.close()
# For now, just close the browser after setup
print("[CLI_DEMO] Intentionally skipping replay part in CLI demo for now.", flush=True)
await browser.close()
if __name__ == "__main__":
import sys, asyncio as _a
# Ensure correct arguments are provided
if len(sys.argv) < 3:
print("Usage: python src/utils/replayer.py <start_url> <path_to_trace_file.jsonl>")
sys.exit(1)
# Configure logging for the CLI demo if run directly
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
_a.run(_cli_demo(sys.argv[1], sys.argv[2])) |