Spaces:
Sleeping
Sleeping
File size: 9,523 Bytes
94ff58a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
import pdb
import pyperclip
from pathlib import Path
import shutil
from typing import Optional, Type, Any
from pydantic import BaseModel
from browser_use.agent.views import ActionResult
# BaseBrowserContext might still be needed if base Controller class uses it, or for type hints if some methods expect it.
# For now, if only used by removed registered actions, it can be removed. Let's assume it can be removed for now.
# from browser_use.browser.context import BrowserContext as BaseBrowserContext
from browser_use.controller.service import Controller
# Sync Playwright imports
from playwright.sync_api import Page as SyncPage, BrowserContext as SyncBrowserContext, TimeoutError as SyncPlaywrightTimeoutError # ADDED/MODIFIED
from main_content_extractor import MainContentExtractor
from browser_use.controller.views import (
ClickElementAction,
DoneAction, # Keep if used by base or other parts
ExtractPageContentAction,
GoToUrlAction,
InputTextAction,
OpenTabAction,
ScrollAction,
SearchGoogleAction,
SendKeysAction,
SwitchTabAction,
)
import logging
import platform # For OS detection for paste shortcut
logger = logging.getLogger(__name__)
class CustomControllerSync(Controller):
"""A controller that extends browser_use controllerfunctionality using Sync Playwright API."""
def __init__(self,
page: SyncPage, # CHANGED: Expects a SyncPage instance
exclude_actions: list[str] = [],
output_model: Optional[Type[BaseModel]] = None
):
super().__init__(exclude_actions=exclude_actions, output_model=output_model)
self.page = page # Store the synchronous page
self.browser_context = page.context # Store its context (which is also sync)
# self._register_custom_actions() # Removed call
def execute(self, action_name: str, **kwargs): # SYNC
logger.debug(f"CustomControllerSync.execute CALLED for action: '{action_name}', args: {kwargs}")
page = self.page # Use the stored synchronous page
if not page:
logger.error("CustomControllerSync.execute: self.page is not set (should have been in __init__).")
raise RuntimeError("Failed to get current page for action execution.")
if action_name == "Upload local file":
selector = kwargs.get("selector")
file_path = kwargs.get("file_path")
if not selector or not file_path:
logger.error(f"Missing selector or file_path for 'Upload local file'. Got: selector='{selector}', file_path='{file_path}'")
raise ValueError("Selector and file_path are required for Upload local file action.")
logger.info(f"Directly executing 'Upload local file': selector='{selector}', file_path='{file_path}'")
try:
page.locator(selector).set_input_files(file_path) # SYNC
logger.info(f"Successfully set input files for selector '{selector}' with path '{file_path}'")
return ActionResult(extracted_content=f"Uploaded {Path(file_path).name} to {selector}")
except Exception as e:
logger.error(f"Error during direct execution of 'Upload local file': {e}", exc_info=True)
raise
elif action_name == "Download remote file":
suggested_filename = kwargs.get("suggested_filename")
recorded_local_path = kwargs.get("recorded_local_path")
dest_dir_str = kwargs.get("dest_dir", "~/Downloads")
if not recorded_local_path:
logger.error(f"Missing 'recorded_local_path' for 'Download remote file' action. Cannot replay download.")
raise ValueError("recorded_local_path is required to replay a download.")
if not suggested_filename:
suggested_filename = Path(recorded_local_path).name
logger.warning(f"Missing 'suggested_filename' in download event, using name from recorded_local_path: {suggested_filename}")
source_path = Path(recorded_local_path)
if not source_path.exists():
logger.error(f"Recorded local file for download does not exist: '{source_path}'")
raise FileNotFoundError(f"Recorded file for download replay not found: {source_path}")
dest_dir = Path(dest_dir_str).expanduser()
dest_dir.mkdir(parents=True, exist_ok=True)
final_dest_path = dest_dir / suggested_filename
original_url_for_logging = kwargs.get("url", "N/A")
logger.info(f"Replaying 'Download remote file' (original URL: {original_url_for_logging}): Copying '{source_path}' to '{final_dest_path}'")
try:
shutil.copy(str(source_path), str(final_dest_path))
logger.info(f"Successfully replayed download by copying to '{final_dest_path}'")
return ActionResult(extracted_content=str(final_dest_path))
except Exception as e:
logger.error(f"Error replaying 'Download remote file' by copying: {e}", exc_info=True)
raise
elif action_name == "Copy text to clipboard":
text = kwargs.get("text")
if text is None: # Should ideally not be Python None if recorder sends "" for empty
text = "" # Default to empty string if it was None
logger.warning("'text' for 'Copy text to clipboard' was None, defaulting to empty string.")
logger.info(f"Executing 'Copy text to clipboard' for text (first 30 chars): '{str(text)[:30]}'")
try:
pyperclip.copy(str(text)) # Ensure text is string for pyperclip
return ActionResult(extracted_content=str(text))
except Exception as e:
logger.error(f"Error during execution of 'Copy text to clipboard': {e}", exc_info=True)
raise
elif action_name == "Paste text from clipboard":
selector = kwargs.get("selector")
problematic_selectors = ["br"] # Add other known non-interactive selectors if needed
logger.info(f"Attempting to execute 'Paste text from clipboard'. Recorded selector: '{selector}'")
try:
can_focus_selector = False
if selector and selector not in problematic_selectors:
try:
target_element = page.locator(selector).first
is_inputtable = target_element.evaluate(
"el => el.tagName === 'INPUT' || el.tagName === 'TEXTAREA' || el.isContentEditable"
)
if is_inputtable:
target_element.wait_for(state="visible", timeout=3000)
target_element.focus(timeout=1000)
logger.info(f"Successfully focused on selector '{selector}' for pasting.")
can_focus_selector = True
else:
logger.warning(f"Selector '{selector}' is not an inputtable element. Will attempt to paste via keyboard shortcut into general focus.")
except Exception as e_focus:
logger.warning(f"Could not focus on selector '{selector}' for paste: {e_focus}. Will attempt to paste via keyboard shortcut into general focus.")
# Determine the correct paste shortcut
paste_keys = "Meta+V" if platform.system() == "Darwin" else "Control+V"
logger.info(f"Simulating paste action using keys: '{paste_keys}'. Focused element before paste: {page.evaluate('document.activeElement.outerHTML?.slice(0,100)')}")
page.keyboard.press(paste_keys)
# Confirm what was on clipboard, for logging purposes
text_that_was_on_clipboard = pyperclip.paste()
logger.info(f"Paste action simulated. Text on clipboard was: '{text_that_was_on_clipboard[:50]}...'")
return ActionResult(extracted_content=f"Pasted from clipboard. Content was: {text_that_was_on_clipboard[:30]}...")
except Exception as e:
logger.error(f"Error during 'Paste text from clipboard': {e}", exc_info=True)
raise
else:
logger.error(f"CustomControllerSync.execute received unhandled action_name: '{action_name}'. This controller only directly handles specific actions for replay.")
# Fallback to registry if needed (this part requires careful review of browser_use registry's sync/async nature)
# if hasattr(self.registry, 'execute_action') and callable(self.registry.execute_action):
# logger.info(f"Attempting fallback to self.registry.execute_action for '{action_name}'")
# # How self.registry.execute_action expects browser_context (sync/async) is key here.
# # Assuming it might need a wrapper or the BaseBrowserContext if that's what it's typed for.
# # This is a placeholder and might need adjustment based on browser_use library.
# return self.registry.execute_action(action_name, params={'browser': self.browser_context, **kwargs})
raise NotImplementedError(f"CustomControllerSync.execute does not handle action: '{action_name}'.") |