Spaces:
Runtime error
Runtime error
"""Selenium web scraping module.""" | |
from __future__ import annotations | |
import logging | |
import asyncio | |
from pathlib import Path | |
from sys import platform | |
from bs4 import BeautifulSoup | |
from webdriver_manager.chrome import ChromeDriverManager | |
from webdriver_manager.firefox import GeckoDriverManager | |
from selenium import webdriver | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.chrome.options import Options as ChromeOptions | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.firefox.options import Options as FirefoxOptions | |
from selenium.webdriver.remote.webdriver import WebDriver | |
from selenium.webdriver.safari.options import Options as SafariOptions | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.support.wait import WebDriverWait | |
from fastapi import WebSocket | |
import processing.text as summary | |
from config import Config | |
from processing.html import extract_hyperlinks, format_hyperlinks | |
from concurrent.futures import ThreadPoolExecutor | |
executor = ThreadPoolExecutor() | |
FILE_DIR = Path(__file__).parent.parent | |
CFG = Config() | |
async def async_browse(url: str, question: str, websocket: WebSocket) -> str: | |
"""Browse a website and return the answer and links to the user | |
Args: | |
url (str): The url of the website to browse | |
question (str): The question asked by the user | |
websocket (WebSocketManager): The websocket manager | |
Returns: | |
str: The answer and links to the user | |
""" | |
loop = asyncio.get_event_loop() | |
executor = ThreadPoolExecutor(max_workers=8) | |
print(f"Scraping url {url} with question {question}") | |
await websocket.send_json( | |
{"type": "logs", "output": f"π Browsing the {url} for relevant about: {question}..."}) | |
try: | |
driver, text = await loop.run_in_executor(executor, scrape_text_with_selenium, url) | |
await loop.run_in_executor(executor, add_header, driver) | |
summary_text = await loop.run_in_executor(executor, summary.summarize_text, url, text, question, driver) | |
await websocket.send_json( | |
{"type": "logs", "output": f"π Information gathered from url {url}: {summary_text}"}) | |
return f"Information gathered from url {url}: {summary_text}" | |
except Exception as e: | |
print(f"An error occurred while processing the url {url}: {e}") | |
return f"Error processing the url {url}: {e}" | |
def browse_website(url: str, question: str) -> tuple[str, WebDriver]: | |
"""Browse a website and return the answer and links to the user | |
Args: | |
url (str): The url of the website to browse | |
question (str): The question asked by the user | |
Returns: | |
Tuple[str, WebDriver]: The answer and links to the user and the webdriver | |
""" | |
if not url: | |
return "A URL was not specified, cancelling request to browse website.", None | |
driver, text = scrape_text_with_selenium(url) | |
add_header(driver) | |
summary_text = summary.summarize_text(url, text, question, driver) | |
links = scrape_links_with_selenium(driver, url) | |
# Limit links to 5 | |
if len(links) > 5: | |
links = links[:5] | |
# write_to_file('research-{0}.txt'.format(url), summary_text + "\nSource Links: {0}\n\n".format(links)) | |
close_browser(driver) | |
return f"Answer gathered from website: {summary_text} \n \n Links: {links}", driver | |
def scrape_text_with_selenium(url: str) -> tuple[WebDriver, str]: | |
"""Scrape text from a website using selenium | |
Args: | |
url (str): The url of the website to scrape | |
Returns: | |
Tuple[WebDriver, str]: The webdriver and the text scraped from the website | |
""" | |
logging.getLogger("selenium").setLevel(logging.CRITICAL) | |
options_available = { | |
"chrome": ChromeOptions, | |
"safari": SafariOptions, | |
"firefox": FirefoxOptions, | |
} | |
options = options_available[CFG.selenium_web_browser]() | |
options.add_argument(f"user-agent={CFG.user_agent}") | |
options.add_argument('--headless') | |
options.add_argument("--enable-javascript") | |
if CFG.selenium_web_browser == "firefox": | |
service = Service(executable_path=GeckoDriverManager().install()) | |
driver = webdriver.Firefox( | |
service=service, options=options | |
) | |
elif CFG.selenium_web_browser == "safari": | |
# Requires a bit more setup on the users end | |
# See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari | |
driver = webdriver.Safari(options=options) | |
else: | |
if platform == "linux" or platform == "linux2": | |
options.add_argument("--disable-dev-shm-usage") | |
options.add_argument("--remote-debugging-port=9222") | |
options.add_argument("--no-sandbox") | |
options.add_experimental_option( | |
"prefs", {"download_restrictions": 3} | |
) | |
driver = webdriver.Chrome(options=options) | |
print(f"scraping url {url}...") | |
driver.get(url) | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
# Get the HTML content directly from the browser's DOM | |
page_source = driver.execute_script("return document.body.outerHTML;") | |
soup = BeautifulSoup(page_source, "html.parser") | |
for script in soup(["script", "style"]): | |
script.extract() | |
# text = soup.get_text() | |
text = get_text(soup) | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = "\n".join(chunk for chunk in chunks if chunk) | |
return driver, text | |
def get_text(soup): | |
"""Get the text from the soup | |
Args: | |
soup (BeautifulSoup): The soup to get the text from | |
Returns: | |
str: The text from the soup | |
""" | |
text = "" | |
tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'p'] | |
for element in soup.find_all(tags): # Find all the <p> elements | |
text += element.text + "\n\n" | |
return text | |
def scrape_links_with_selenium(driver: WebDriver, url: str) -> list[str]: | |
"""Scrape links from a website using selenium | |
Args: | |
driver (WebDriver): The webdriver to use to scrape the links | |
Returns: | |
List[str]: The links scraped from the website | |
""" | |
page_source = driver.page_source | |
soup = BeautifulSoup(page_source, "html.parser") | |
for script in soup(["script", "style"]): | |
script.extract() | |
hyperlinks = extract_hyperlinks(soup, url) | |
return format_hyperlinks(hyperlinks) | |
def close_browser(driver: WebDriver) -> None: | |
"""Close the browser | |
Args: | |
driver (WebDriver): The webdriver to close | |
Returns: | |
None | |
""" | |
driver.quit() | |
def add_header(driver: WebDriver) -> None: | |
"""Add a header to the website | |
Args: | |
driver (WebDriver): The webdriver to use to add the header | |
Returns: | |
None | |
""" | |
driver.execute_script(open(f"{FILE_DIR}/js/overlay.js", "r").read()) | |