import pdb import pyperclip from pathlib import Path import shutil from typing import Optional, Type, Any from pydantic import BaseModel from browser_use.agent.views import ActionResult # BaseBrowserContext might still be needed if base Controller class uses it, or for type hints if some methods expect it. # For now, if only used by removed registered actions, it can be removed. Let's assume it can be removed for now. # from browser_use.browser.context import BrowserContext as BaseBrowserContext from browser_use.controller.service import Controller # Sync Playwright imports from playwright.sync_api import Page as SyncPage, BrowserContext as SyncBrowserContext, TimeoutError as SyncPlaywrightTimeoutError # ADDED/MODIFIED from main_content_extractor import MainContentExtractor from browser_use.controller.views import ( ClickElementAction, DoneAction, # Keep if used by base or other parts ExtractPageContentAction, GoToUrlAction, InputTextAction, OpenTabAction, ScrollAction, SearchGoogleAction, SendKeysAction, SwitchTabAction, ) import logging import platform # For OS detection for paste shortcut logger = logging.getLogger(__name__) class CustomControllerSync(Controller): """A controller that extends browser_use controllerfunctionality using Sync Playwright API.""" def __init__(self, page: SyncPage, # CHANGED: Expects a SyncPage instance exclude_actions: list[str] = [], output_model: Optional[Type[BaseModel]] = None ): super().__init__(exclude_actions=exclude_actions, output_model=output_model) self.page = page # Store the synchronous page self.browser_context = page.context # Store its context (which is also sync) # self._register_custom_actions() # Removed call def execute(self, action_name: str, **kwargs): # SYNC logger.debug(f"CustomControllerSync.execute CALLED for action: '{action_name}', args: {kwargs}") page = self.page # Use the stored synchronous page if not page: logger.error("CustomControllerSync.execute: self.page is not set (should have been in __init__).") raise RuntimeError("Failed to get current page for action execution.") if action_name == "Upload local file": selector = kwargs.get("selector") file_path = kwargs.get("file_path") if not selector or not file_path: logger.error(f"Missing selector or file_path for 'Upload local file'. Got: selector='{selector}', file_path='{file_path}'") raise ValueError("Selector and file_path are required for Upload local file action.") logger.info(f"Directly executing 'Upload local file': selector='{selector}', file_path='{file_path}'") try: page.locator(selector).set_input_files(file_path) # SYNC logger.info(f"Successfully set input files for selector '{selector}' with path '{file_path}'") return ActionResult(extracted_content=f"Uploaded {Path(file_path).name} to {selector}") except Exception as e: logger.error(f"Error during direct execution of 'Upload local file': {e}", exc_info=True) raise elif action_name == "Download remote file": suggested_filename = kwargs.get("suggested_filename") recorded_local_path = kwargs.get("recorded_local_path") dest_dir_str = kwargs.get("dest_dir", "~/Downloads") if not recorded_local_path: logger.error(f"Missing 'recorded_local_path' for 'Download remote file' action. Cannot replay download.") raise ValueError("recorded_local_path is required to replay a download.") if not suggested_filename: suggested_filename = Path(recorded_local_path).name logger.warning(f"Missing 'suggested_filename' in download event, using name from recorded_local_path: {suggested_filename}") source_path = Path(recorded_local_path) if not source_path.exists(): logger.error(f"Recorded local file for download does not exist: '{source_path}'") raise FileNotFoundError(f"Recorded file for download replay not found: {source_path}") dest_dir = Path(dest_dir_str).expanduser() dest_dir.mkdir(parents=True, exist_ok=True) final_dest_path = dest_dir / suggested_filename original_url_for_logging = kwargs.get("url", "N/A") logger.info(f"Replaying 'Download remote file' (original URL: {original_url_for_logging}): Copying '{source_path}' to '{final_dest_path}'") try: shutil.copy(str(source_path), str(final_dest_path)) logger.info(f"Successfully replayed download by copying to '{final_dest_path}'") return ActionResult(extracted_content=str(final_dest_path)) except Exception as e: logger.error(f"Error replaying 'Download remote file' by copying: {e}", exc_info=True) raise elif action_name == "Copy text to clipboard": text = kwargs.get("text") if text is None: # Should ideally not be Python None if recorder sends "" for empty text = "" # Default to empty string if it was None logger.warning("'text' for 'Copy text to clipboard' was None, defaulting to empty string.") logger.info(f"Executing 'Copy text to clipboard' for text (first 30 chars): '{str(text)[:30]}'") try: pyperclip.copy(str(text)) # Ensure text is string for pyperclip return ActionResult(extracted_content=str(text)) except Exception as e: logger.error(f"Error during execution of 'Copy text to clipboard': {e}", exc_info=True) raise elif action_name == "Paste text from clipboard": selector = kwargs.get("selector") problematic_selectors = ["br"] # Add other known non-interactive selectors if needed logger.info(f"Attempting to execute 'Paste text from clipboard'. Recorded selector: '{selector}'") try: can_focus_selector = False if selector and selector not in problematic_selectors: try: target_element = page.locator(selector).first is_inputtable = target_element.evaluate( "el => el.tagName === 'INPUT' || el.tagName === 'TEXTAREA' || el.isContentEditable" ) if is_inputtable: target_element.wait_for(state="visible", timeout=3000) target_element.focus(timeout=1000) logger.info(f"Successfully focused on selector '{selector}' for pasting.") can_focus_selector = True else: logger.warning(f"Selector '{selector}' is not an inputtable element. Will attempt to paste via keyboard shortcut into general focus.") except Exception as e_focus: logger.warning(f"Could not focus on selector '{selector}' for paste: {e_focus}. Will attempt to paste via keyboard shortcut into general focus.") # Determine the correct paste shortcut paste_keys = "Meta+V" if platform.system() == "Darwin" else "Control+V" logger.info(f"Simulating paste action using keys: '{paste_keys}'. Focused element before paste: {page.evaluate('document.activeElement.outerHTML?.slice(0,100)')}") page.keyboard.press(paste_keys) # Confirm what was on clipboard, for logging purposes text_that_was_on_clipboard = pyperclip.paste() logger.info(f"Paste action simulated. Text on clipboard was: '{text_that_was_on_clipboard[:50]}...'") return ActionResult(extracted_content=f"Pasted from clipboard. Content was: {text_that_was_on_clipboard[:30]}...") except Exception as e: logger.error(f"Error during 'Paste text from clipboard': {e}", exc_info=True) raise else: logger.error(f"CustomControllerSync.execute received unhandled action_name: '{action_name}'. This controller only directly handles specific actions for replay.") # Fallback to registry if needed (this part requires careful review of browser_use registry's sync/async nature) # if hasattr(self.registry, 'execute_action') and callable(self.registry.execute_action): # logger.info(f"Attempting fallback to self.registry.execute_action for '{action_name}'") # # How self.registry.execute_action expects browser_context (sync/async) is key here. # # Assuming it might need a wrapper or the BaseBrowserContext if that's what it's typed for. # # This is a placeholder and might need adjustment based on browser_use library. # return self.registry.execute_action(action_name, params={'browser': self.browser_context, **kwargs}) raise NotImplementedError(f"CustomControllerSync.execute does not handle action: '{action_name}'.")