File size: 9,523 Bytes
94ff58a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import pdb
import pyperclip
from pathlib import Path
import shutil
from typing import Optional, Type, Any 
from pydantic import BaseModel
from browser_use.agent.views import ActionResult
# BaseBrowserContext might still be needed if base Controller class uses it, or for type hints if some methods expect it.
# For now, if only used by removed registered actions, it can be removed. Let's assume it can be removed for now.
# from browser_use.browser.context import BrowserContext as BaseBrowserContext
from browser_use.controller.service import Controller 
# Sync Playwright imports
from playwright.sync_api import Page as SyncPage, BrowserContext as SyncBrowserContext, TimeoutError as SyncPlaywrightTimeoutError # ADDED/MODIFIED

from main_content_extractor import MainContentExtractor
from browser_use.controller.views import (
    ClickElementAction,
    DoneAction, # Keep if used by base or other parts
    ExtractPageContentAction,
    GoToUrlAction,
    InputTextAction,
    OpenTabAction,
    ScrollAction,
    SearchGoogleAction,
    SendKeysAction,
    SwitchTabAction,
)
import logging
import platform # For OS detection for paste shortcut

logger = logging.getLogger(__name__)

class CustomControllerSync(Controller):
    """A controller that extends browser_use controllerfunctionality using Sync Playwright API."""
    
    def __init__(self, 
                 page: SyncPage, # CHANGED: Expects a SyncPage instance
                 exclude_actions: list[str] = [],
                 output_model: Optional[Type[BaseModel]] = None
                 ):
        super().__init__(exclude_actions=exclude_actions, output_model=output_model)
        self.page = page # Store the synchronous page
        self.browser_context = page.context # Store its context (which is also sync)
        # self._register_custom_actions() # Removed call

    def execute(self, action_name: str, **kwargs): # SYNC
        logger.debug(f"CustomControllerSync.execute CALLED for action: '{action_name}', args: {kwargs}")
        page = self.page # Use the stored synchronous page
        if not page:
            logger.error("CustomControllerSync.execute: self.page is not set (should have been in __init__).")
            raise RuntimeError("Failed to get current page for action execution.")

        if action_name == "Upload local file":
            selector = kwargs.get("selector")
            file_path = kwargs.get("file_path")
            if not selector or not file_path:
                logger.error(f"Missing selector or file_path for 'Upload local file'. Got: selector='{selector}', file_path='{file_path}'")
                raise ValueError("Selector and file_path are required for Upload local file action.")
            
            logger.info(f"Directly executing 'Upload local file': selector='{selector}', file_path='{file_path}'")
            try:
                page.locator(selector).set_input_files(file_path) # SYNC
                logger.info(f"Successfully set input files for selector '{selector}' with path '{file_path}'")
                return ActionResult(extracted_content=f"Uploaded {Path(file_path).name} to {selector}")
            except Exception as e:
                logger.error(f"Error during direct execution of 'Upload local file': {e}", exc_info=True)
                raise
        
        elif action_name == "Download remote file":
            suggested_filename = kwargs.get("suggested_filename")
            recorded_local_path = kwargs.get("recorded_local_path")
            dest_dir_str = kwargs.get("dest_dir", "~/Downloads") 

            if not recorded_local_path:
                logger.error(f"Missing 'recorded_local_path' for 'Download remote file' action. Cannot replay download.")
                raise ValueError("recorded_local_path is required to replay a download.")
            
            if not suggested_filename:
                suggested_filename = Path(recorded_local_path).name 
                logger.warning(f"Missing 'suggested_filename' in download event, using name from recorded_local_path: {suggested_filename}")

            source_path = Path(recorded_local_path)
            if not source_path.exists():
                logger.error(f"Recorded local file for download does not exist: '{source_path}'")
                raise FileNotFoundError(f"Recorded file for download replay not found: {source_path}")

            dest_dir = Path(dest_dir_str).expanduser()
            dest_dir.mkdir(parents=True, exist_ok=True)
            final_dest_path = dest_dir / suggested_filename

            original_url_for_logging = kwargs.get("url", "N/A")
            logger.info(f"Replaying 'Download remote file' (original URL: {original_url_for_logging}): Copying '{source_path}' to '{final_dest_path}'")
            try:
                shutil.copy(str(source_path), str(final_dest_path))
                logger.info(f"Successfully replayed download by copying to '{final_dest_path}'")
                return ActionResult(extracted_content=str(final_dest_path))
            except Exception as e:
                logger.error(f"Error replaying 'Download remote file' by copying: {e}", exc_info=True)
                raise

        elif action_name == "Copy text to clipboard":
            text = kwargs.get("text")
            if text is None: # Should ideally not be Python None if recorder sends "" for empty
                text = "" # Default to empty string if it was None
                logger.warning("'text' for 'Copy text to clipboard' was None, defaulting to empty string.")
            logger.info(f"Executing 'Copy text to clipboard' for text (first 30 chars): '{str(text)[:30]}'")
            try:
                pyperclip.copy(str(text)) # Ensure text is string for pyperclip
                return ActionResult(extracted_content=str(text))
            except Exception as e:
                logger.error(f"Error during execution of 'Copy text to clipboard': {e}", exc_info=True)
                raise
        
        elif action_name == "Paste text from clipboard":
            selector = kwargs.get("selector")
            problematic_selectors = ["br"] # Add other known non-interactive selectors if needed
            logger.info(f"Attempting to execute 'Paste text from clipboard'. Recorded selector: '{selector}'")
            
            try:
                can_focus_selector = False
                if selector and selector not in problematic_selectors:
                    try:
                        target_element = page.locator(selector).first
                        is_inputtable = target_element.evaluate(
                            "el => el.tagName === 'INPUT' || el.tagName === 'TEXTAREA' || el.isContentEditable"
                        )
                        if is_inputtable:
                            target_element.wait_for(state="visible", timeout=3000)
                            target_element.focus(timeout=1000)
                            logger.info(f"Successfully focused on selector '{selector}' for pasting.")
                            can_focus_selector = True
                        else:
                            logger.warning(f"Selector '{selector}' is not an inputtable element. Will attempt to paste via keyboard shortcut into general focus.")
                    except Exception as e_focus:
                        logger.warning(f"Could not focus on selector '{selector}' for paste: {e_focus}. Will attempt to paste via keyboard shortcut into general focus.")
                
                # Determine the correct paste shortcut
                paste_keys = "Meta+V" if platform.system() == "Darwin" else "Control+V"

                logger.info(f"Simulating paste action using keys: '{paste_keys}'. Focused element before paste: {page.evaluate('document.activeElement.outerHTML?.slice(0,100)')}")
                page.keyboard.press(paste_keys)
                
                # Confirm what was on clipboard, for logging purposes
                text_that_was_on_clipboard = pyperclip.paste()
                logger.info(f"Paste action simulated. Text on clipboard was: '{text_that_was_on_clipboard[:50]}...'")
                
                return ActionResult(extracted_content=f"Pasted from clipboard. Content was: {text_that_was_on_clipboard[:30]}...")

            except Exception as e:
                logger.error(f"Error during 'Paste text from clipboard': {e}", exc_info=True)
                raise
        else:
            logger.error(f"CustomControllerSync.execute received unhandled action_name: '{action_name}'. This controller only directly handles specific actions for replay.")
            # Fallback to registry if needed (this part requires careful review of browser_use registry's sync/async nature)
            # if hasattr(self.registry, 'execute_action') and callable(self.registry.execute_action):
            #     logger.info(f"Attempting fallback to self.registry.execute_action for '{action_name}'")
            #     # How self.registry.execute_action expects browser_context (sync/async) is key here.
            #     # Assuming it might need a wrapper or the BaseBrowserContext if that's what it's typed for.
            #     # This is a placeholder and might need adjustment based on browser_use library.
            #     return self.registry.execute_action(action_name, params={'browser': self.browser_context, **kwargs}) 
            raise NotImplementedError(f"CustomControllerSync.execute does not handle action: '{action_name}'.")