# scrape.py from fastapi import FastAPI, HTTPException, Request, Response from pydantic import BaseModel from typing import Optional import base64 import json import asyncio from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError from fastapi.responses import FileResponse import os import uuid app = FastAPI(title="Web Analyzer API") class ScreenshotResponse(BaseModel): screenshot: str class MetadataResponse(BaseModel): title: Optional[str] description: Optional[str] og: dict twitter: dict canonical: Optional[str] # Optional timeout wrapper to enforce global timeout async def timeout_wrapper(coro, timeout=20): try: return await asyncio.wait_for(coro, timeout) except asyncio.TimeoutError: raise HTTPException(status_code=504, detail="Operation timed out") # More robust get_page() with fallbacks, stealth, and logging async def get_page(url): print(f"[INFO] Visiting URL: {url}") pw = await async_playwright().start() browser = await pw.chromium.launch(headless=True) context = await browser.new_context() # Stealth mode: prevent simple headless detection await context.add_init_script( "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" ) page = await context.new_page() page.set_default_timeout(20000) # 20s max for waits on elements try: try: print("[INFO] Trying to load with 'domcontentloaded'") await page.goto(url, wait_until="domcontentloaded", timeout=20000) except PlaywrightTimeoutError: print("[WARN] domcontentloaded failed, trying 'load'") await page.goto(url, wait_until="load", timeout=20000) try: await page.wait_for_selector("body", timeout=5000) except Exception: print("[WARN] not found quickly. May still continue.") except Exception as e: print(f"[ERROR] Page load failed for {url}: {e}") await browser.close() await pw.stop() raise HTTPException(status_code=504, detail=f"Page load failed: {str(e)}") print("[INFO] Page loaded successfully.") return page, browser, pw # async def get_page(url): # pw = await async_playwright().start() # browser = await pw.chromium.launch(headless=True) # context = await browser.new_context() # # Stealth: hide headless detection # await context.add_init_script( # "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" # ) # page = await context.new_page() # page.set_default_timeout(90000) # Apply to all waits # try: # # Try networkidle first (wait for full load) # await page.goto(url, timeout=90000, wait_until="networkidle") # await page.wait_for_selector("body", timeout=10000) # Ensure DOM is visible # except PlaywrightTimeoutError: # try: # # Fallback to lighter load event # await page.goto(url, timeout=90000, wait_until="load") # except Exception as e: # await browser.close() # await pw.stop() # raise HTTPException(status_code=504, detail=f"Page load failed: {str(e)}") # return page, browser, pw @app.middleware("http") async def remove_leaky_headers(request: Request, call_next): response: Response = await call_next(request) # Safe header removal for header in [ "link", "x-proxied-host", "x-proxied-path", "x-proxied-replica", "server" ]: try: del response.headers[header] except KeyError: pass # Header not present # Add your own branded header response.headers["server"] = "Webrify-Secure-Gateway" return response @app.get("/metadata", response_model=MetadataResponse) async def get_metadata(url: str): page, browser, pw = await get_page(url) try: title = await page.title() # Get description meta tag try: desc = await page.get_attribute("meta[name='description']", "content") except Exception: desc = None # Extract Open Graph metadata og = {} for prop in ["title", "description", "image"]: try: selector = f"meta[property='og:{prop}']" if await page.query_selector(selector): og[f"og:{prop}"] = await page.get_attribute(selector, "content") else: og[f"og:{prop}"] = None except Exception: og[f"og:{prop}"] = None # Extract Twitter metadata twitter = {} for prop in ["title", "description", "image"]: try: selector = f"meta[name='twitter:{prop}']" if await page.query_selector(selector): twitter[f"twitter:{prop}"] = await page.get_attribute(selector, "content") else: twitter[f"twitter:{prop}"] = None except Exception: twitter[f"twitter:{prop}"] = None # Get canonical URL try: canonical = await page.get_attribute("link[rel='canonical']", "href") except Exception: canonical = None return { "title": title, "description": desc, "og": og, "twitter": twitter, "canonical": canonical } finally: await browser.close() await pw.stop() # @app.get("/screenshot", response_model=ScreenshotResponse) # async def get_screenshot(url: str): # page, browser, pw = await get_page(url) # try: # image_bytes = await page.screenshot(full_page=True) # image_base64 = base64.b64encode(image_bytes).decode() # return {"screenshot": image_base64} # finally: # await browser.close() # await pw.stop() # @app.get("/screenshot", response_model=ScreenshotResponse) # async def get_screenshot(url: str): # page, browser, pw = await get_page(url) # try: # # Scroll to bottom to trigger lazy-loaded content # await page.evaluate(""" # () => { # return new Promise((resolve) => { # let totalHeight = 0; # const distance = 100; # const timer = setInterval(() => { # window.scrollBy(0, distance); # totalHeight += distance; # if (totalHeight >= document.body.scrollHeight) { # clearInterval(timer); # resolve(); # } # }, 100); # }); # } # """) # # Give time for images and content to load # await page.wait_for_timeout(2000) # image_bytes = await page.screenshot(full_page=True) # image_base64 = base64.b64encode(image_bytes).decode() # return {"screenshot": image_base64} # finally: # await browser.close() # await pw.stop() @app.get("/screenshot", response_model=ScreenshotResponse) async def get_screenshot(url: str): page, browser, pw = await get_page(url) try: # Go to the page and wait until the network is idle await page.goto(url, wait_until="networkidle", timeout=90000) # Wait for the header (or similar element) to load try: await page.wait_for_selector("header", timeout=10000) except: pass # Don't fail if the header doesn't exist # Remove sticky or fixed header issues before full-page screenshot await page.add_style_tag(content=""" * { scroll-behavior: auto !important; } header, .sticky, .fixed, [style*="position:fixed"] { position: static !important; top: auto !important; } """) # Scroll down to trigger lazy loading await page.evaluate(""" () => { return new Promise((resolve) => { let totalHeight = 0; const distance = 100; const timer = setInterval(() => { window.scrollBy(0, distance); totalHeight += distance; if (totalHeight >= document.body.scrollHeight) { clearInterval(timer); resolve(); } }, 100); }); } """) # Wait to ensure lazy content and animations complete await page.wait_for_timeout(2000) # Take full-page screenshot image_bytes = await page.screenshot(full_page=True) image_base64 = base64.b64encode(image_bytes).decode() return {"screenshot": image_base64} finally: await browser.close() await pw.stop() @app.get("/seo") async def seo_audit(url: str): page, browser, pw = await get_page(url) try: h1_count = await page.locator("h1").count() imgs = await page.query_selector_all("img") missing_alts = [await img.get_attribute("src") for img in imgs if not await img.get_attribute("alt")] anchors = await page.query_selector_all("a[href]") internal, external = 0, 0 for a in anchors: href = await a.get_attribute("href") if href and href.startswith("http"): if url in href: internal += 1 else: external += 1 try: robots = await page.get_attribute("meta[name='robots']", "content") except Exception: robots = None try: canonical = await page.get_attribute("link[rel='canonical']", "href") except Exception: canonical = None return { "h1_count": h1_count, "missing_image_alts": missing_alts, "internal_links": internal, "external_links": external, "robots_meta": robots, "has_canonical": bool(canonical) } finally: await browser.close() await pw.stop() @app.get("/performance") async def performance_metrics(url: str): page, browser, pw = await get_page(url) try: # Get navigation timing try: nav_timing = await page.evaluate("JSON.stringify(performance.getEntriesByType('navigation'))") timing = json.loads(nav_timing)[0] if nav_timing else {} page_load_time = timing.get('duration', None) except Exception: page_load_time = None # Get First Contentful Paint try: fcp = await page.evaluate("performance.getEntriesByName('first-contentful-paint')[0]?.startTime") except Exception: fcp = None # Get Largest Contentful Paint try: lcp = await page.evaluate("performance.getEntriesByType('largest-contentful-paint')[0]?.renderTime") except Exception: lcp = None # Get Cumulative Layout Shift try: cls_entries = await page.evaluate("JSON.stringify(performance.getEntriesByType('layout-shift'))") cls = sum(e.get('value', 0) for e in json.loads(cls_entries) if isinstance(e, dict)) except Exception: cls = None return { "page_load_time_ms": page_load_time, "first_contentful_paint": fcp, "largest_contentful_paint": lcp, "cumulative_layout_shift": cls } finally: await browser.close() await pw.stop() @app.get("/structured-data") async def structured_data(url: str): page, browser, pw = await get_page(url) try: scripts = await page.query_selector_all("script[type='application/ld+json']") json_ld_list = [] for s in scripts: text = await s.inner_text() try: data = json.loads(text) json_ld_list.append(data) except Exception: continue types = [] for obj in json_ld_list: if isinstance(obj, dict) and "@type" in obj: types.append(obj["@type"]) return { "schema_found": bool(json_ld_list), "types": types, "schema": json_ld_list } finally: await browser.close() await pw.stop() @app.get("/accessibility") async def accessibility_check(url: str): page, browser, pw = await get_page(url) try: imgs = await page.query_selector_all("img") missing_alt = len([img for img in imgs if not await img.get_attribute("alt")]) buttons = await page.query_selector_all("button") missing_labels = len([b for b in buttons if not await b.get_attribute("aria-label") and not await b.inner_text()]) landmarks = [] for tag in ["main", "nav", "footer", "header"]: if await page.query_selector(tag): landmarks.append(tag) return { "images_missing_alt": missing_alt, "buttons_missing_label": missing_labels, "landmarks": landmarks } finally: await browser.close() await pw.stop() @app.get("/html-to-pdf") async def convert_html_to_pdf(url: str): from playwright.async_api import async_playwright filename = f"{uuid.uuid4().hex}.pdf" output_path = f"/tmp/{filename}" # Or use another temp dir pw = await async_playwright().start() browser = await pw.chromium.launch() page = await browser.new_page() try: await page.goto(url, wait_until="networkidle") await page.pdf( path=output_path, format="A4", print_background=True, margin={"top": "1cm", "bottom": "1cm", "left": "1cm", "right": "1cm"}, ) finally: await browser.close() await pw.stop() # Serve the file and remove after response return FileResponse( path=output_path, filename="webpage.pdf", media_type="application/pdf", headers={"Content-Disposition": "attachment; filename=webpage.pdf"} )