Spaces:
Running
Running
File size: 6,165 Bytes
408c946 b3cf31b 408c946 b3cf31b bcca9d9 408c946 b3cf31b 408c946 b3cf31b 408c946 b3cf31b 408c946 b3cf31b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import asyncio
import aiohttp
import requests
from urllib.parse import quote
from config import CONTENT_EXTRACTION, SEARCH_SELECTION
from src.core.web_loader import web_loader
class BrowserEngine:
def __init__(self, configuration):
self.config = configuration
def generate_headers(self):
ipv4 = web_loader.get_ipv4()
ipv6 = web_loader.get_ipv6()
user_agent = web_loader.get_user_agent()
origin = web_loader.get_origin()
referrer = web_loader.get_referrer()
location = web_loader.get_location()
return {
"User-Agent": user_agent,
"X-Forwarded-For": f"{ipv4}, {ipv6}",
"X-Real-IP": ipv4,
"X-Originating-IP": ipv4,
"X-Remote-IP": ipv4,
"X-Remote-Addr": ipv4,
"X-Client-IP": ipv4,
"X-Forwarded-Host": origin.replace("https://", "").replace("http://", ""),
"Origin": origin,
"Referer": referrer,
"Accept-Language": f"{location['language']},en;q=0.9",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
"Sec-Fetch-User": "?1",
"Cache-Control": "max-age=0",
"X-Country": location['country'],
"X-Timezone": location['timezone']
}
def _build_search_url_and_selector(self, search_query: str, search_provider: str = "google"):
if search_provider == "baidu":
return (
f"{self.config.content_reader_api}{self.config.baidu_endpoint}?wd={quote(search_query)}",
"#content_left"
)
provider_prefix = "!go" if search_provider == "google" else "!bi"
return (
f"{self.config.content_reader_api}{self.config.searxng_endpoint}?q={quote(f'{provider_prefix} {search_query}')}",
"#urls"
)
async def _async_post(self, url: str, data: dict, headers: dict):
timeout = aiohttp.ClientTimeout(total=self.config.request_timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, data=data, headers=headers) as response:
text = await response.text()
if response.status >= 400:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message=text,
headers=response.headers
)
return text
async def _async_get(self, url: str, headers: dict):
timeout = aiohttp.ClientTimeout(total=self.config.request_timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as response:
text = await response.text()
if response.status >= 400:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message=text,
headers=response.headers
)
return text
def _sync_post(self, url: str, data: dict, headers: dict):
response = requests.post(url, data=data, headers=headers, timeout=self.config.request_timeout)
response.raise_for_status()
return response.text
def _sync_get(self, url: str, headers: dict):
response = requests.get(url, headers=headers, timeout=self.config.request_timeout)
response.raise_for_status()
return response.text
async def async_extract_page_content(self, target_url: str) -> str:
headers = self.generate_headers()
payload = {"url": target_url}
extracted_content = await self._async_post(self.config.content_reader_api, payload, headers)
return f"{extracted_content}\n\n\n{CONTENT_EXTRACTION}\n\n\n"
def extract_page_content(self, target_url: str) -> str:
try:
return asyncio.run(self.async_extract_page_content(target_url))
except Exception:
try:
headers = self.generate_headers()
payload = {"url": target_url}
extracted_content = self._sync_post(self.config.content_reader_api, payload, headers)
return f"{extracted_content}\n\n\n{CONTENT_EXTRACTION}\n\n\n"
except Exception as error:
return f"Error reading URL: {str(error)}"
async def async_perform_search(self, search_query: str, search_provider: str = "google") -> str:
headers = self.generate_headers()
full_url, selector = self._build_search_url_and_selector(search_query, search_provider)
headers["X-Target-Selector"] = selector
search_results = await self._async_get(full_url, headers)
return f"{search_results}\n\n\n{SEARCH_SELECTION}\n\n\n"
def perform_search(self, search_query: str, search_provider: str = "google") -> str:
try:
return asyncio.run(self.async_perform_search(search_query, search_provider))
except Exception:
try:
headers = self.generate_headers()
full_url, selector = self._build_search_url_and_selector(search_query, search_provider)
headers["X-Target-Selector"] = selector
search_results = self._sync_get(full_url, headers)
return f"{search_results}\n\n\n{SEARCH_SELECTION}\n\n\n"
except Exception as error:
return f"Error during search: {str(error)}" |