"""Selenium web scraping module.""" from __future__ import annotations import logging import asyncio from pathlib import Path from sys import platform from bs4 import BeautifulSoup from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.firefox import GeckoDriverManager from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.common.by import By from selenium.webdriver.firefox.options import Options as FirefoxOptions from selenium.webdriver.remote.webdriver import WebDriver from selenium.webdriver.safari.options import Options as SafariOptions from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from fastapi import WebSocket import processing.text as summary from config import Config from processing.html import extract_hyperlinks, format_hyperlinks from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor() FILE_DIR = Path(__file__).parent.parent CFG = Config() async def async_browse(url: str, question: str, websocket: WebSocket) -> str: """Browse a website and return the answer and links to the user Args: url (str): The url of the website to browse question (str): The question asked by the user websocket (WebSocketManager): The websocket manager Returns: str: The answer and links to the user """ loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=8) print(f"Scraping url {url} with question {question}") await websocket.send_json( {"type": "logs", "output": f"🔎 Browsing the {url} for relevant about: {question}..."}) try: driver, text = await loop.run_in_executor(executor, scrape_text_with_selenium, url) await loop.run_in_executor(executor, add_header, driver) summary_text = await loop.run_in_executor(executor, summary.summarize_text, url, text, question, driver) await websocket.send_json( {"type": "logs", "output": f"📝 Information gathered from url {url}: {summary_text}"}) return f"Information gathered from url {url}: {summary_text}" except Exception as e: print(f"An error occurred while processing the url {url}: {e}") return f"Error processing the url {url}: {e}" def browse_website(url: str, question: str) -> tuple[str, WebDriver]: """Browse a website and return the answer and links to the user Args: url (str): The url of the website to browse question (str): The question asked by the user Returns: Tuple[str, WebDriver]: The answer and links to the user and the webdriver """ if not url: return "A URL was not specified, cancelling request to browse website.", None driver, text = scrape_text_with_selenium(url) add_header(driver) summary_text = summary.summarize_text(url, text, question, driver) links = scrape_links_with_selenium(driver, url) # Limit links to 5 if len(links) > 5: links = links[:5] # write_to_file('research-{0}.txt'.format(url), summary_text + "\nSource Links: {0}\n\n".format(links)) close_browser(driver) return f"Answer gathered from website: {summary_text} \n \n Links: {links}", driver def scrape_text_with_selenium(url: str) -> tuple[WebDriver, str]: """Scrape text from a website using selenium Args: url (str): The url of the website to scrape Returns: Tuple[WebDriver, str]: The webdriver and the text scraped from the website """ logging.getLogger("selenium").setLevel(logging.CRITICAL) options_available = { "chrome": ChromeOptions, "safari": SafariOptions, "firefox": FirefoxOptions, } options = options_available[CFG.selenium_web_browser]() options.add_argument(f"user-agent={CFG.user_agent}") options.add_argument('--headless') options.add_argument("--enable-javascript") if CFG.selenium_web_browser == "firefox": service = Service(executable_path=GeckoDriverManager().install()) driver = webdriver.Firefox( service=service, options=options ) elif CFG.selenium_web_browser == "safari": # Requires a bit more setup on the users end # See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari driver = webdriver.Safari(options=options) else: if platform == "linux" or platform == "linux2": options.add_argument("--disable-dev-shm-usage") options.add_argument("--remote-debugging-port=9222") options.add_argument("--no-sandbox") options.add_experimental_option( "prefs", {"download_restrictions": 3} ) driver = webdriver.Chrome(options=options) print(f"scraping url {url}...") driver.get(url) WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) # Get the HTML content directly from the browser's DOM page_source = driver.execute_script("return document.body.outerHTML;") soup = BeautifulSoup(page_source, "html.parser") for script in soup(["script", "style"]): script.extract() # text = soup.get_text() text = get_text(soup) lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = "\n".join(chunk for chunk in chunks if chunk) return driver, text def get_text(soup): """Get the text from the soup Args: soup (BeautifulSoup): The soup to get the text from Returns: str: The text from the soup """ text = "" tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'p'] for element in soup.find_all(tags): # Find all the
elements text += element.text + "\n\n" return text def scrape_links_with_selenium(driver: WebDriver, url: str) -> list[str]: """Scrape links from a website using selenium Args: driver (WebDriver): The webdriver to use to scrape the links Returns: List[str]: The links scraped from the website """ page_source = driver.page_source soup = BeautifulSoup(page_source, "html.parser") for script in soup(["script", "style"]): script.extract() hyperlinks = extract_hyperlinks(soup, url) return format_hyperlinks(hyperlinks) def close_browser(driver: WebDriver) -> None: """Close the browser Args: driver (WebDriver): The webdriver to close Returns: None """ driver.quit() def add_header(driver: WebDriver) -> None: """Add a header to the website Args: driver (WebDriver): The webdriver to use to add the header Returns: None """ driver.execute_script(open(f"{FILE_DIR}/js/overlay.js", "r").read())