Spaces:
Sleeping
Sleeping
import pdb | |
import pyperclip | |
from pathlib import Path | |
import shutil | |
from typing import Optional, Type, Any | |
from pydantic import BaseModel | |
from browser_use.agent.views import ActionResult | |
# BaseBrowserContext might still be needed if base Controller class uses it, or for type hints if some methods expect it. | |
# For now, if only used by removed registered actions, it can be removed. Let's assume it can be removed for now. | |
# from browser_use.browser.context import BrowserContext as BaseBrowserContext | |
from browser_use.controller.service import Controller | |
# Sync Playwright imports | |
from playwright.sync_api import Page as SyncPage, BrowserContext as SyncBrowserContext, TimeoutError as SyncPlaywrightTimeoutError # ADDED/MODIFIED | |
from main_content_extractor import MainContentExtractor | |
from browser_use.controller.views import ( | |
ClickElementAction, | |
DoneAction, # Keep if used by base or other parts | |
ExtractPageContentAction, | |
GoToUrlAction, | |
InputTextAction, | |
OpenTabAction, | |
ScrollAction, | |
SearchGoogleAction, | |
SendKeysAction, | |
SwitchTabAction, | |
) | |
import logging | |
import platform # For OS detection for paste shortcut | |
logger = logging.getLogger(__name__) | |
class CustomControllerSync(Controller): | |
"""A controller that extends browser_use controllerfunctionality using Sync Playwright API.""" | |
def __init__(self, | |
page: SyncPage, # CHANGED: Expects a SyncPage instance | |
exclude_actions: list[str] = [], | |
output_model: Optional[Type[BaseModel]] = None | |
): | |
super().__init__(exclude_actions=exclude_actions, output_model=output_model) | |
self.page = page # Store the synchronous page | |
self.browser_context = page.context # Store its context (which is also sync) | |
# self._register_custom_actions() # Removed call | |
def execute(self, action_name: str, **kwargs): # SYNC | |
logger.debug(f"CustomControllerSync.execute CALLED for action: '{action_name}', args: {kwargs}") | |
page = self.page # Use the stored synchronous page | |
if not page: | |
logger.error("CustomControllerSync.execute: self.page is not set (should have been in __init__).") | |
raise RuntimeError("Failed to get current page for action execution.") | |
if action_name == "Upload local file": | |
selector = kwargs.get("selector") | |
file_path = kwargs.get("file_path") | |
if not selector or not file_path: | |
logger.error(f"Missing selector or file_path for 'Upload local file'. Got: selector='{selector}', file_path='{file_path}'") | |
raise ValueError("Selector and file_path are required for Upload local file action.") | |
logger.info(f"Directly executing 'Upload local file': selector='{selector}', file_path='{file_path}'") | |
try: | |
page.locator(selector).set_input_files(file_path) # SYNC | |
logger.info(f"Successfully set input files for selector '{selector}' with path '{file_path}'") | |
return ActionResult(extracted_content=f"Uploaded {Path(file_path).name} to {selector}") | |
except Exception as e: | |
logger.error(f"Error during direct execution of 'Upload local file': {e}", exc_info=True) | |
raise | |
elif action_name == "Download remote file": | |
suggested_filename = kwargs.get("suggested_filename") | |
recorded_local_path = kwargs.get("recorded_local_path") | |
dest_dir_str = kwargs.get("dest_dir", "~/Downloads") | |
if not recorded_local_path: | |
logger.error(f"Missing 'recorded_local_path' for 'Download remote file' action. Cannot replay download.") | |
raise ValueError("recorded_local_path is required to replay a download.") | |
if not suggested_filename: | |
suggested_filename = Path(recorded_local_path).name | |
logger.warning(f"Missing 'suggested_filename' in download event, using name from recorded_local_path: {suggested_filename}") | |
source_path = Path(recorded_local_path) | |
if not source_path.exists(): | |
logger.error(f"Recorded local file for download does not exist: '{source_path}'") | |
raise FileNotFoundError(f"Recorded file for download replay not found: {source_path}") | |
dest_dir = Path(dest_dir_str).expanduser() | |
dest_dir.mkdir(parents=True, exist_ok=True) | |
final_dest_path = dest_dir / suggested_filename | |
original_url_for_logging = kwargs.get("url", "N/A") | |
logger.info(f"Replaying 'Download remote file' (original URL: {original_url_for_logging}): Copying '{source_path}' to '{final_dest_path}'") | |
try: | |
shutil.copy(str(source_path), str(final_dest_path)) | |
logger.info(f"Successfully replayed download by copying to '{final_dest_path}'") | |
return ActionResult(extracted_content=str(final_dest_path)) | |
except Exception as e: | |
logger.error(f"Error replaying 'Download remote file' by copying: {e}", exc_info=True) | |
raise | |
elif action_name == "Copy text to clipboard": | |
text = kwargs.get("text") | |
if text is None: # Should ideally not be Python None if recorder sends "" for empty | |
text = "" # Default to empty string if it was None | |
logger.warning("'text' for 'Copy text to clipboard' was None, defaulting to empty string.") | |
logger.info(f"Executing 'Copy text to clipboard' for text (first 30 chars): '{str(text)[:30]}'") | |
try: | |
pyperclip.copy(str(text)) # Ensure text is string for pyperclip | |
return ActionResult(extracted_content=str(text)) | |
except Exception as e: | |
logger.error(f"Error during execution of 'Copy text to clipboard': {e}", exc_info=True) | |
raise | |
elif action_name == "Paste text from clipboard": | |
selector = kwargs.get("selector") | |
problematic_selectors = ["br"] # Add other known non-interactive selectors if needed | |
logger.info(f"Attempting to execute 'Paste text from clipboard'. Recorded selector: '{selector}'") | |
try: | |
can_focus_selector = False | |
if selector and selector not in problematic_selectors: | |
try: | |
target_element = page.locator(selector).first | |
is_inputtable = target_element.evaluate( | |
"el => el.tagName === 'INPUT' || el.tagName === 'TEXTAREA' || el.isContentEditable" | |
) | |
if is_inputtable: | |
target_element.wait_for(state="visible", timeout=3000) | |
target_element.focus(timeout=1000) | |
logger.info(f"Successfully focused on selector '{selector}' for pasting.") | |
can_focus_selector = True | |
else: | |
logger.warning(f"Selector '{selector}' is not an inputtable element. Will attempt to paste via keyboard shortcut into general focus.") | |
except Exception as e_focus: | |
logger.warning(f"Could not focus on selector '{selector}' for paste: {e_focus}. Will attempt to paste via keyboard shortcut into general focus.") | |
# Determine the correct paste shortcut | |
paste_keys = "Meta+V" if platform.system() == "Darwin" else "Control+V" | |
logger.info(f"Simulating paste action using keys: '{paste_keys}'. Focused element before paste: {page.evaluate('document.activeElement.outerHTML?.slice(0,100)')}") | |
page.keyboard.press(paste_keys) | |
# Confirm what was on clipboard, for logging purposes | |
text_that_was_on_clipboard = pyperclip.paste() | |
logger.info(f"Paste action simulated. Text on clipboard was: '{text_that_was_on_clipboard[:50]}...'") | |
return ActionResult(extracted_content=f"Pasted from clipboard. Content was: {text_that_was_on_clipboard[:30]}...") | |
except Exception as e: | |
logger.error(f"Error during 'Paste text from clipboard': {e}", exc_info=True) | |
raise | |
else: | |
logger.error(f"CustomControllerSync.execute received unhandled action_name: '{action_name}'. This controller only directly handles specific actions for replay.") | |
# Fallback to registry if needed (this part requires careful review of browser_use registry's sync/async nature) | |
# if hasattr(self.registry, 'execute_action') and callable(self.registry.execute_action): | |
# logger.info(f"Attempting fallback to self.registry.execute_action for '{action_name}'") | |
# # How self.registry.execute_action expects browser_context (sync/async) is key here. | |
# # Assuming it might need a wrapper or the BaseBrowserContext if that's what it's typed for. | |
# # This is a placeholder and might need adjustment based on browser_use library. | |
# return self.registry.execute_action(action_name, params={'browser': self.browser_context, **kwargs}) | |
raise NotImplementedError(f"CustomControllerSync.execute does not handle action: '{action_name}'.") |