from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel from playwright.async_api import async_playwright import asyncio import base64 import logging from typing import List, Optional from urllib.parse import urlparse # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="Playwright Web Scraper", description="A simple web scraper using Playwright") class LinkInfo(BaseModel): text: str href: str class ContactInfo(BaseModel): emails: List[str] = [] phones: List[str] = [] social_media: List[str] = [] contact_forms: List[str] = [] class ScriptInfo(BaseModel): src: str script_type: Optional[str] = None is_external: bool = False class BusinessInfo(BaseModel): company_name: Optional[str] = None address: Optional[str] = None description: Optional[str] = None industry_keywords: List[str] = [] class LeadData(BaseModel): contact_info: ContactInfo business_info: BusinessInfo lead_score: int = 0 technologies: List[str] = [] class ScrapeResponse(BaseModel): full_html: Optional[str] = None body_content: Optional[str] = None screenshot: Optional[str] = None links: Optional[List[LinkInfo]] = None scripts: Optional[List[ScriptInfo]] = None page_title: Optional[str] = None meta_description: Optional[str] = None lead_data: Optional[LeadData] = None visited_urls = set() @app.get("/") async def root(): return { "message": "🚀 Lead Generation Web Scraper API", "tagline": "Turn any website into qualified leads", "endpoints": { "/scrape": "Extract leads, contacts, and business data from any website", "/docs": "API documentation" } } def normalize_url(url): parsed = urlparse(url) return parsed._replace(fragment='', query='').geturl().rstrip('/') @app.get("/scrape") async def scrape_page( url: str = Query(..., description="URL to scrape"), lead_generation: bool = Query(True, description="Extract lead generation data (emails, phones, business info)"), screenshot: bool = Query(True, description="Take a full page screenshot"), get_links: bool = Query(True, description="Extract all links from the page"), get_body: bool = Query(False, description="Extract body tag content (can be large)"), get_frontend: bool = Query(True, description="Get full rendered frontend HTML content") ): norm_url = normalize_url(url) if norm_url in visited_urls: raise HTTPException(status_code=400, detail="URL already scraped") visited_urls.add(norm_url) logger.info(f"Starting scrape for URL: {norm_url}") try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) page = await browser.new_page() try: await page.goto(norm_url, wait_until="domcontentloaded", timeout=60000) response = ScrapeResponse() response.page_title = await page.title() response.meta_description = await page.evaluate(""" () => { const meta = document.querySelector('meta[name="description"]'); return meta ? meta.getAttribute('content') : null; } """) if get_frontend: response.full_html = await page.content() if get_body: response.body_content = await page.evaluate(""" () => { const body = document.querySelector('body'); if (!body) return null; const scripts = body.querySelectorAll('script, style, noscript'); scripts.forEach(el => el.remove()); return body.innerText.trim(); } """) if screenshot: screenshot_bytes = await page.screenshot(full_page=True) response.screenshot = base64.b64encode(screenshot_bytes).decode('utf-8') if get_links: links = await page.evaluate(""" () => { return Array.from(document.querySelectorAll('a[href]')).map(a => { const text = a.innerText.trim() || a.getAttribute('aria-label') || a.getAttribute('title') || a.href; const href = a.href; if (href && href.startsWith('http')) { return { text: text.substring(0, 200), href: href }; } return null; }).filter(link => link !== null); } """) response.links = [LinkInfo(**link) for link in links] if lead_generation: lead_data_raw = await page.evaluate(""" () => { const result = { emails: [], phones: [], social_media: [], contact_forms: [], company_name: null, address: null, technologies: [], industry_keywords: [] }; const emailRegex = /[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/g; const pageText = document.body.innerText; const emails = pageText.match(emailRegex) || []; const mailtoEmails = Array.from(document.querySelectorAll('a[href^="mailto:"]')) .map(a => a.href.replace(/^mailto:/, '').split('?')[0]); result.emails = [...new Set([...emails, ...mailtoEmails])].slice(0, 10); const phoneRegex = /(\+?1?[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})/g; const phones = pageText.match(phoneRegex) || []; const telPhones = Array.from(document.querySelectorAll('a[href^="tel:"]')) .map(a => a.href.replace(/^tel:/, '').split('?')[0]); result.phones = [...new Set([...phones, ...telPhones])].slice(0, 5); const socialLinks = Array.from(document.querySelectorAll('a[href]')) .map(a => a.href).filter(href => /facebook|twitter|linkedin|instagram|youtube|tiktok/i.test(href)); result.social_media = [...new Set(socialLinks)].slice(0, 10); const forms = Array.from(document.querySelectorAll('form')).map(form => form.action || window.location.href); result.contact_forms = [...new Set(forms)].slice(0, 5); result.company_name = document.querySelector('meta[property="og:site_name"]')?.content || document.querySelector('meta[name="application-name"]')?.content || document.querySelector('h1')?.innerText?.trim() || document.title?.split('|')[0]?.split('-')[0]?.trim(); const addressRegex = /\d+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl)\s*,?\s*[A-Za-z\s]+,?\s*[A-Z]{2}\s*\d{5}/g; const addresses = pageText.match(addressRegex) || []; result.address = addresses[0] || null; const techKeywords = ['wordpress', 'shopify', 'react', 'angular', 'vue', 'bootstrap', 'jquery', 'google analytics', 'facebook pixel']; const htmlContent = document.documentElement.outerHTML.toLowerCase(); result.technologies = techKeywords.filter(tech => htmlContent.includes(tech)); const industryKeywords = ['consulting', 'marketing', 'software', 'healthcare', 'finance', 'real estate', 'education', 'retail', 'manufacturing', 'legal', 'restaurant', 'fitness', 'beauty', 'automotive']; const lowerPageText = pageText.toLowerCase(); result.industry_keywords = industryKeywords.filter(keyword => lowerPageText.includes(keyword)); return result; } """) lead_score = 0 if lead_data_raw['emails']: lead_score += 30 if lead_data_raw['phones']: lead_score += 25 if lead_data_raw['contact_forms']: lead_score += 15 if lead_data_raw['social_media']: lead_score += 10 if lead_data_raw['company_name']: lead_score += 10 if lead_data_raw['address']: lead_score += 10 if lead_data_raw['technologies']: lead_score += 5 if lead_data_raw['industry_keywords']: lead_score += 5 contact_info = ContactInfo( emails=lead_data_raw['emails'], phones=lead_data_raw['phones'], social_media=lead_data_raw['social_media'], contact_forms=lead_data_raw['contact_forms'] ) business_info = BusinessInfo( company_name=lead_data_raw['company_name'], address=lead_data_raw['address'], description=response.meta_description, industry_keywords=lead_data_raw['industry_keywords'] ) response.lead_data = LeadData( contact_info=contact_info, business_info=business_info, lead_score=min(lead_score, 100), technologies=lead_data_raw['technologies'] ) await browser.close() logger.info("Scraping completed successfully") return response except Exception as e: logger.error(f"Error during scraping: {str(e)}") await browser.close() raise HTTPException(status_code=500, detail=f"Scraping error: {str(e)}") except Exception as e: logger.error(f"Error launching browser: {str(e)}") raise HTTPException(status_code=500, detail=f"Browser launch error: {str(e)}")