Spaces:
Runtime error
Runtime error
File size: 6,973 Bytes
57b8424 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
"""Selenium web scraping module."""
from __future__ import annotations
import logging
import asyncio
from pathlib import Path
from sys import platform
from bs4 import BeautifulSoup
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import GeckoDriverManager
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.safari.options import Options as SafariOptions
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from fastapi import WebSocket
import processing.text as summary
from config import Config
from processing.html import extract_hyperlinks, format_hyperlinks
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
FILE_DIR = Path(__file__).parent.parent
CFG = Config()
async def async_browse(url: str, question: str, websocket: WebSocket) -> str:
"""Browse a website and return the answer and links to the user
Args:
url (str): The url of the website to browse
question (str): The question asked by the user
websocket (WebSocketManager): The websocket manager
Returns:
str: The answer and links to the user
"""
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=8)
print(f"Scraping url {url} with question {question}")
await websocket.send_json(
{"type": "logs", "output": f"π Browsing the {url} for relevant about: {question}..."})
try:
driver, text = await loop.run_in_executor(executor, scrape_text_with_selenium, url)
await loop.run_in_executor(executor, add_header, driver)
summary_text = await loop.run_in_executor(executor, summary.summarize_text, url, text, question, driver)
await websocket.send_json(
{"type": "logs", "output": f"π Information gathered from url {url}: {summary_text}"})
return f"Information gathered from url {url}: {summary_text}"
except Exception as e:
print(f"An error occurred while processing the url {url}: {e}")
return f"Error processing the url {url}: {e}"
def browse_website(url: str, question: str) -> tuple[str, WebDriver]:
"""Browse a website and return the answer and links to the user
Args:
url (str): The url of the website to browse
question (str): The question asked by the user
Returns:
Tuple[str, WebDriver]: The answer and links to the user and the webdriver
"""
if not url:
return "A URL was not specified, cancelling request to browse website.", None
driver, text = scrape_text_with_selenium(url)
add_header(driver)
summary_text = summary.summarize_text(url, text, question, driver)
links = scrape_links_with_selenium(driver, url)
# Limit links to 5
if len(links) > 5:
links = links[:5]
# write_to_file('research-{0}.txt'.format(url), summary_text + "\nSource Links: {0}\n\n".format(links))
close_browser(driver)
return f"Answer gathered from website: {summary_text} \n \n Links: {links}", driver
def scrape_text_with_selenium(url: str) -> tuple[WebDriver, str]:
"""Scrape text from a website using selenium
Args:
url (str): The url of the website to scrape
Returns:
Tuple[WebDriver, str]: The webdriver and the text scraped from the website
"""
logging.getLogger("selenium").setLevel(logging.CRITICAL)
options_available = {
"chrome": ChromeOptions,
"safari": SafariOptions,
"firefox": FirefoxOptions,
}
options = options_available[CFG.selenium_web_browser]()
options.add_argument(f"user-agent={CFG.user_agent}")
options.add_argument('--headless')
options.add_argument("--enable-javascript")
if CFG.selenium_web_browser == "firefox":
service = Service(executable_path=GeckoDriverManager().install())
driver = webdriver.Firefox(
service=service, options=options
)
elif CFG.selenium_web_browser == "safari":
# Requires a bit more setup on the users end
# See https://developer.apple.com/documentation/webkit/testing_with_webdriver_in_safari
driver = webdriver.Safari(options=options)
else:
if platform == "linux" or platform == "linux2":
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--remote-debugging-port=9222")
options.add_argument("--no-sandbox")
options.add_experimental_option(
"prefs", {"download_restrictions": 3}
)
driver = webdriver.Chrome(options=options)
print(f"scraping url {url}...")
driver.get(url)
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Get the HTML content directly from the browser's DOM
page_source = driver.execute_script("return document.body.outerHTML;")
soup = BeautifulSoup(page_source, "html.parser")
for script in soup(["script", "style"]):
script.extract()
# text = soup.get_text()
text = get_text(soup)
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = "\n".join(chunk for chunk in chunks if chunk)
return driver, text
def get_text(soup):
"""Get the text from the soup
Args:
soup (BeautifulSoup): The soup to get the text from
Returns:
str: The text from the soup
"""
text = ""
tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'p']
for element in soup.find_all(tags): # Find all the <p> elements
text += element.text + "\n\n"
return text
def scrape_links_with_selenium(driver: WebDriver, url: str) -> list[str]:
"""Scrape links from a website using selenium
Args:
driver (WebDriver): The webdriver to use to scrape the links
Returns:
List[str]: The links scraped from the website
"""
page_source = driver.page_source
soup = BeautifulSoup(page_source, "html.parser")
for script in soup(["script", "style"]):
script.extract()
hyperlinks = extract_hyperlinks(soup, url)
return format_hyperlinks(hyperlinks)
def close_browser(driver: WebDriver) -> None:
"""Close the browser
Args:
driver (WebDriver): The webdriver to close
Returns:
None
"""
driver.quit()
def add_header(driver: WebDriver) -> None:
"""Add a header to the website
Args:
driver (WebDriver): The webdriver to use to add the header
Returns:
None
"""
driver.execute_script(open(f"{FILE_DIR}/js/overlay.js", "r").read())
|