Spaces:
Running
Running
# | |
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import asyncio | |
import aiohttp | |
import requests | |
from urllib.parse import quote | |
from config import CONTENT_EXTRACTION, SEARCH_SELECTION | |
from src.core.web_loader import web_loader | |
class BrowserEngine: | |
def __init__(self, configuration): | |
self.config = configuration | |
def generate_headers(self): | |
ipv4 = web_loader.get_ipv4() | |
ipv6 = web_loader.get_ipv6() | |
user_agent = web_loader.get_user_agent() | |
origin = web_loader.get_origin() | |
referrer = web_loader.get_referrer() | |
location = web_loader.get_location() | |
return { | |
"User-Agent": user_agent, | |
"X-Forwarded-For": f"{ipv4}, {ipv6}", | |
"X-Real-IP": ipv4, | |
"X-Originating-IP": ipv4, | |
"X-Remote-IP": ipv4, | |
"X-Remote-Addr": ipv4, | |
"X-Client-IP": ipv4, | |
"X-Forwarded-Host": origin.replace("https://", "").replace("http://", ""), | |
"Origin": origin, | |
"Referer": referrer, | |
"Accept-Language": f"{location['language']},en;q=0.9", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | |
"Accept-Encoding": "gzip, deflate, br", | |
"DNT": "1", | |
"Connection": "keep-alive", | |
"Upgrade-Insecure-Requests": "1", | |
"Sec-Fetch-Dest": "document", | |
"Sec-Fetch-Mode": "navigate", | |
"Sec-Fetch-Site": "cross-site", | |
"Sec-Fetch-User": "?1", | |
"Cache-Control": "max-age=0", | |
"X-Country": location['country'], | |
"X-Timezone": location['timezone'] | |
} | |
def _build_search_url_and_selector(self, search_query: str, search_provider: str = "google"): | |
if search_provider == "baidu": | |
return ( | |
f"{self.config.content_reader_api}{self.config.baidu_endpoint}?wd={quote(search_query)}", | |
"#content_left" | |
) | |
provider_prefix = "!go" if search_provider == "google" else "!bi" | |
return ( | |
f"{self.config.content_reader_api}{self.config.searxng_endpoint}?q={quote(f'{provider_prefix} {search_query}')}", | |
"#urls" | |
) | |
async def _async_post(self, url: str, data: dict, headers: dict): | |
timeout = aiohttp.ClientTimeout(total=self.config.request_timeout) | |
async with aiohttp.ClientSession(timeout=timeout) as session: | |
async with session.post(url, data=data, headers=headers) as response: | |
text = await response.text() | |
if response.status >= 400: | |
raise aiohttp.ClientResponseError( | |
request_info=response.request_info, | |
history=response.history, | |
status=response.status, | |
message=text, | |
headers=response.headers | |
) | |
return text | |
async def _async_get(self, url: str, headers: dict): | |
timeout = aiohttp.ClientTimeout(total=self.config.request_timeout) | |
async with aiohttp.ClientSession(timeout=timeout) as session: | |
async with session.get(url, headers=headers) as response: | |
text = await response.text() | |
if response.status >= 400: | |
raise aiohttp.ClientResponseError( | |
request_info=response.request_info, | |
history=response.history, | |
status=response.status, | |
message=text, | |
headers=response.headers | |
) | |
return text | |
def _sync_post(self, url: str, data: dict, headers: dict): | |
response = requests.post(url, data=data, headers=headers, timeout=self.config.request_timeout) | |
response.raise_for_status() | |
return response.text | |
def _sync_get(self, url: str, headers: dict): | |
response = requests.get(url, headers=headers, timeout=self.config.request_timeout) | |
response.raise_for_status() | |
return response.text | |
async def async_extract_page_content(self, target_url: str) -> str: | |
headers = self.generate_headers() | |
payload = {"url": target_url} | |
extracted_content = await self._async_post(self.config.content_reader_api, payload, headers) | |
return f"{extracted_content}\n\n\n{CONTENT_EXTRACTION}\n\n\n" | |
def extract_page_content(self, target_url: str) -> str: | |
try: | |
return asyncio.run(self.async_extract_page_content(target_url)) | |
except Exception: | |
try: | |
headers = self.generate_headers() | |
payload = {"url": target_url} | |
extracted_content = self._sync_post(self.config.content_reader_api, payload, headers) | |
return f"{extracted_content}\n\n\n{CONTENT_EXTRACTION}\n\n\n" | |
except Exception as error: | |
return f"Error reading URL: {str(error)}" | |
async def async_perform_search(self, search_query: str, search_provider: str = "google") -> str: | |
headers = self.generate_headers() | |
full_url, selector = self._build_search_url_and_selector(search_query, search_provider) | |
headers["X-Target-Selector"] = selector | |
search_results = await self._async_get(full_url, headers) | |
return f"{search_results}\n\n\n{SEARCH_SELECTION}\n\n\n" | |
def perform_search(self, search_query: str, search_provider: str = "google") -> str: | |
try: | |
return asyncio.run(self.async_perform_search(search_query, search_provider)) | |
except Exception: | |
try: | |
headers = self.generate_headers() | |
full_url, selector = self._build_search_url_and_selector(search_query, search_provider) | |
headers["X-Target-Selector"] = selector | |
search_results = self._sync_get(full_url, headers) | |
return f"{search_results}\n\n\n{SEARCH_SELECTION}\n\n\n" | |
except Exception as error: | |
return f"Error during search: {str(error)}" |