Spaces:
Sleeping
Sleeping
import subprocess | |
import tempfile | |
import os | |
from io import StringIO | |
import re | |
from functools import lru_cache | |
from typing import Optional, Dict, Union | |
from browserforge.headers import Browser, HeaderGenerator | |
from tldextract import extract | |
from markitdown import MarkItDown | |
from markdown import Markdown | |
import brotli | |
import zstandard as zstd | |
import gzip | |
import zlib | |
from urllib.parse import unquote | |
from smolagents import tool | |
class Response: | |
def __init__(self, response, convert_to_markdown, convert_to_plain_text): | |
self._response = response | |
self._convert_to_markdown = convert_to_markdown | |
self._convert_to_plain_text = convert_to_plain_text | |
self._markdown = None | |
self._plain_text = None | |
def __getattr__(self, item): | |
return getattr(self._response, item) | |
def markdown(self) -> str: | |
if self._markdown is None: | |
self._markdown = self._convert_to_markdown(self._response.content) | |
return self._markdown | |
def plain_text(self) -> str: | |
if self._plain_text is None: | |
self._plain_text = self._convert_to_plain_text(self._response.content) | |
return self._plain_text | |
def generate_headers() -> Dict[str, str]: | |
browsers = [ | |
Browser(name='chrome', min_version=120), | |
Browser(name='firefox', min_version=120), | |
Browser(name='edge', min_version=120), | |
] | |
return HeaderGenerator(browser=browsers, device='desktop').generate() | |
def generate_convincing_referer(url: str) -> str: | |
website_name = extract(url).domain | |
return f'https://www.google.com/search?q={website_name}' | |
def headers_job(headers: Optional[Dict], url: str) -> Dict: | |
headers = headers or {} | |
# Ensure a User-Agent is present. | |
headers['User-Agent'] = generate_headers().get('User-Agent') | |
extra_headers = generate_headers() | |
headers.update(extra_headers) | |
headers.update({'referer': generate_convincing_referer(url)}) | |
return headers | |
def convert_to_markdown(content: bytes) -> str: | |
md = MarkItDown() | |
temp_path = None | |
try: | |
with tempfile.NamedTemporaryFile(delete=False) as tmp_file: | |
tmp_file.write(content) | |
tmp_file.flush() | |
temp_path = tmp_file.name | |
markdown_result = md.convert_local(temp_path).text_content | |
return markdown_result | |
except Exception as e: | |
raise e | |
finally: | |
if temp_path and os.path.exists(temp_path): | |
os.remove(temp_path) | |
def convert_to_plain_text(content: bytes) -> str: | |
md_content = convert_to_markdown(content) | |
def unmark_element(element, stream=None): | |
if stream is None: | |
stream = StringIO() | |
if element.text: | |
stream.write(element.text) | |
for sub in element: | |
unmark_element(sub, stream) | |
if element.tail: | |
stream.write(element.tail) | |
return stream.getvalue() | |
Markdown.output_formats["plain"] = unmark_element | |
__md = Markdown(output_format="plain") | |
__md.stripTopLevelTags = False | |
final_text = __md.convert(md_content) | |
final_text = re.sub(r"\n+", " ", final_text) | |
return final_text | |
class BasicScraper: | |
"""Basic scraper class for making HTTP requests using curl.""" | |
def __init__( | |
self, | |
proxy: Optional[str] = None, | |
follow_redirects: bool = True, | |
timeout: Optional[Union[int, float]] = None, | |
retries: Optional[int] = 3 | |
): | |
self.proxy = proxy | |
self.timeout = timeout | |
self.follow_redirects = bool(follow_redirects) | |
self.retries = retries | |
def _curl_get( | |
self, | |
url: str, | |
headers: Dict[str, str], | |
cookies: Optional[Dict], | |
timeout: Optional[Union[int, float]], | |
proxy: Optional[str], | |
follow_redirects: bool | |
) -> bytes: | |
# Use -i to include HTTP headers in the output. | |
curl_command = ["curl", "-s", "-i"] | |
if follow_redirects: | |
curl_command.append("-L") | |
if self.retries: | |
curl_command.extend(["--retry", str(self.retries)]) | |
# Add headers. | |
for key, value in headers.items(): | |
curl_command.extend(["-H", f"{key}: {value}"]) | |
# Add cookies if provided. | |
if cookies: | |
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()]) | |
curl_command.extend(["--cookie", cookie_str]) | |
# Set proxy if specified. | |
if proxy: | |
curl_command.extend(["--proxy", proxy]) | |
# Set timeout options. | |
if timeout: | |
curl_command.extend(["--connect-timeout", str(timeout), "--max-time", str(timeout)]) | |
curl_command.append(url) | |
try: | |
result = subprocess.run( | |
curl_command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
check=False | |
) | |
if result.returncode != 0: | |
raise Exception(f"Curl command failed: {result.stderr.decode('utf-8')}") | |
raw_response = result.stdout | |
# Split the response into header blocks and body. | |
parts = raw_response.split(b'\r\n\r\n') | |
if len(parts) >= 2: | |
body = parts[-1] | |
last_header_block = parts[-2] | |
else: | |
body = raw_response | |
last_header_block = b"" | |
# Look for a Content-Encoding header in the last header block. | |
content_encoding = None | |
for line in last_header_block.decode('utf-8', errors='ignore').splitlines(): | |
if line.lower().startswith("content-encoding:"): | |
content_encoding = line.split(":", 1)[1].strip().lower() | |
break | |
# Decode Brotli or Zstandard if needed. | |
if content_encoding: | |
try: | |
if 'br' in content_encoding: | |
body = brotli.decompress(body) | |
elif 'zstd' in content_encoding: | |
dctx = zstd.ZstdDecompressor() | |
try: | |
body = dctx.decompress(body) | |
except zstd.ZstdError as e: | |
# Fallback to streaming decompression if content size is unknown | |
if "could not determine content size" in str(e): | |
dctx_stream = zstd.ZstdDecompressor().decompressobj() | |
body = dctx_stream.decompress(body) | |
body += dctx_stream.flush() | |
else: | |
raise | |
elif 'gzip' in content_encoding: | |
body = gzip.decompress(body) | |
elif 'deflate' in content_encoding: | |
body = zlib.decompress(body) | |
except Exception as e: | |
raise Exception(f"Error decompressing content: {e}") | |
return body | |
except Exception as e: | |
raise Exception(f"Error during curl request: {e}") | |
def get( | |
self, | |
url: str, | |
cookies: Optional[Dict] = None, | |
timeout: Optional[Union[int, float]] = None, | |
**kwargs: Dict | |
) -> Response: | |
url = unquote(url).replace(" ", "+") | |
hdrs = headers_job(kwargs.pop('headers', {}), url) | |
effective_timeout = self.timeout if self.timeout is not None else timeout | |
content = self._curl_get( | |
url, | |
headers=hdrs, | |
cookies=cookies, | |
timeout=effective_timeout, | |
proxy=self.proxy, | |
follow_redirects=self.follow_redirects | |
) | |
# Create a dummy response object with a 'content' attribute. | |
class DummyResponse: | |
pass | |
dummy = DummyResponse() | |
dummy.content = content | |
return Response( | |
response=dummy, | |
convert_to_markdown=convert_to_markdown, | |
convert_to_plain_text=convert_to_plain_text | |
) |