from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel from playwright.async_api import async_playwright import asyncio import base64 import logging from typing import List, Optional # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="Playwright Web Scraper", description="A simple web scraper using Playwright") class LinkInfo(BaseModel): text: str href: str class ContactInfo(BaseModel): emails: List[str] = [] phones: List[str] = [] social_media: List[str] = [] contact_forms: List[str] = [] class ScriptInfo(BaseModel): src: str script_type: Optional[str] = None is_external: bool = False class BusinessInfo(BaseModel): company_name: Optional[str] = None address: Optional[str] = None description: Optional[str] = None industry_keywords: List[str] = [] class LeadData(BaseModel): contact_info: ContactInfo business_info: BusinessInfo lead_score: int = 0 technologies: List[str] = [] class ScrapeResponse(BaseModel): body_content: Optional[str] = None screenshot: Optional[str] = None links: Optional[List[LinkInfo]] = None scripts: Optional[List[ScriptInfo]] = None page_title: Optional[str] = None meta_description: Optional[str] = None lead_data: Optional[LeadData] = None @app.get("/") async def root(): return { "message": "🚀 Lead Generation Web Scraper API", "tagline": "Turn any website into qualified leads", "endpoints": { "/scrape": "Extract leads, contacts, and business data from any website", "/docs": "API documentation" }, "example": "/scrape?url=https://example.com&lead_generation=true&screenshot=true", "lead_generation_features": [ "📧 Extract email addresses and contact forms", "📞 Find phone numbers and contact info", "🏢 Identify company names and addresses", "🔗 Discover social media profiles", "⚡ Detect technologies and tools used", "📊 Calculate lead quality scores", "🎯 Industry keyword extraction" ], "basic_features": [ "📄 Clean body text extraction", "🔗 Smart link filtering", "� Script and JavaScript file extraction", "�📸 Full page screenshots", "📋 Page metadata extraction" ], "use_cases": [ "B2B lead generation", "Sales prospecting", "Market research", "Competitor analysis", "Contact discovery" ] } @app.get("/scrape") async def scrape_page( url: str = Query(..., description="URL to scrape"), lead_generation: bool = Query(True, description="Extract lead generation data (emails, phones, business info)"), screenshot: bool = Query(True, description="Take a full page screenshot"), get_links: bool = Query(True, description="Extract all links from the page"), get_body: bool = Query(False, description="Extract body tag content (can be large)") ): logger.info(f"Starting scrape for URL: {url}") try: async with async_playwright() as p: logger.info("Launching browser...") browser = await p.chromium.launch( headless=True, args=[ '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-accelerated-2d-canvas', '--no-first-run', '--no-zygote', '--disable-gpu' ] ) page = await browser.new_page() try: logger.info(f"Navigating to {url}...") # await page.goto(url, wait_until="networkidle") await page.goto(url, wait_until="domcontentloaded", timeout=60000) response = ScrapeResponse() # Always get page title and meta description logger.info("Getting page metadata...") response.page_title = await page.title() meta_desc = await page.evaluate(""" () => { const meta = document.querySelector('meta[name="description"]'); return meta ? meta.getAttribute('content') : null; } """) response.meta_description = meta_desc # Get body content (clean text) if get_body: logger.info("Extracting body content...") body_content = await page.evaluate(""" () => { const body = document.querySelector('body'); if (!body) return null; // Remove script and style elements const scripts = body.querySelectorAll('script, style, noscript'); scripts.forEach(el => el.remove()); // Get clean text content return body.innerText.trim(); } """) response.body_content = body_content # Get screenshot (full page) if screenshot: logger.info("Taking full page screenshot...") screenshot_bytes = await page.screenshot(full_page=True) response.screenshot = base64.b64encode(screenshot_bytes).decode('utf-8') # Get links with better filtering if get_links: logger.info("Extracting links...") links = await page.evaluate(""" () => { return Array.from(document.querySelectorAll('a[href]')).map(a => { const text = a.innerText.trim(); const href = a.href; // Only include links with meaningful text and valid URLs if (text && href && href.startsWith('http')) { return { text: text.substring(0, 200), // Limit text length href: href } } return null; }).filter(link => link !== null); } """) response.links = [LinkInfo(**link) for link in links] # Lead Generation Extraction if lead_generation: logger.info("Extracting lead generation data...") lead_data_raw = await page.evaluate(""" () => { const result = { emails: [], phones: [], social_media: [], contact_forms: [], company_name: null, address: null, technologies: [], industry_keywords: [] }; // Extract emails const emailRegex = /[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/g; const pageText = document.body.innerText; const emails = pageText.match(emailRegex) || []; result.emails = [...new Set(emails)].slice(0, 10); // Unique emails, max 10 // Extract phone numbers const phoneRegex = /(\+?1?[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})/g; const phones = pageText.match(phoneRegex) || []; result.phones = [...new Set(phones)].slice(0, 5); // Unique phones, max 5 // Extract social media links const socialLinks = Array.from(document.querySelectorAll('a[href]')).map(a => a.href) .filter(href => /facebook|twitter|linkedin|instagram|youtube|tiktok/i.test(href)); result.social_media = [...new Set(socialLinks)].slice(0, 10); // Find contact forms const forms = Array.from(document.querySelectorAll('form')).map(form => { const action = form.action || window.location.href; return action; }); result.contact_forms = [...new Set(forms)].slice(0, 5); // Extract company name (try multiple methods) result.company_name = document.querySelector('meta[property="og:site_name"]')?.content || document.querySelector('meta[name="application-name"]')?.content || document.querySelector('h1')?.innerText?.trim() || document.title?.split('|')[0]?.split('-')[0]?.trim(); // Extract address const addressRegex = /\d+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl)\s*,?\s*[A-Za-z\s]+,?\s*[A-Z]{2}\s*\d{5}/g; const addresses = pageText.match(addressRegex) || []; result.address = addresses[0] || null; // Detect technologies const techKeywords = ['wordpress', 'shopify', 'react', 'angular', 'vue', 'bootstrap', 'jquery', 'google analytics', 'facebook pixel']; const htmlContent = document.documentElement.outerHTML.toLowerCase(); result.technologies = techKeywords.filter(tech => htmlContent.includes(tech)); // Industry keywords const industryKeywords = ['consulting', 'marketing', 'software', 'healthcare', 'finance', 'real estate', 'education', 'retail', 'manufacturing', 'legal', 'restaurant', 'fitness', 'beauty', 'automotive']; const lowerPageText = pageText.toLowerCase(); result.industry_keywords = industryKeywords.filter(keyword => lowerPageText.includes(keyword)); return result; } """) # Calculate lead score lead_score = 0 if lead_data_raw['emails']: lead_score += 30 if lead_data_raw['phones']: lead_score += 25 if lead_data_raw['contact_forms']: lead_score += 20 if lead_data_raw['social_media']: lead_score += 15 if lead_data_raw['company_name']: lead_score += 10 if lead_data_raw['address']: lead_score += 15 if lead_data_raw['technologies']: lead_score += 10 if lead_data_raw['industry_keywords']: lead_score += 5 # Create lead data object contact_info = ContactInfo( emails=lead_data_raw['emails'], phones=lead_data_raw['phones'], social_media=lead_data_raw['social_media'], contact_forms=lead_data_raw['contact_forms'] ) business_info = BusinessInfo( company_name=lead_data_raw['company_name'], address=lead_data_raw['address'], description=response.meta_description, industry_keywords=lead_data_raw['industry_keywords'] ) response.lead_data = LeadData( contact_info=contact_info, business_info=business_info, lead_score=min(lead_score, 100), # Cap at 100 technologies=lead_data_raw['technologies'] ) await browser.close() logger.info("Scraping completed successfully") return response except Exception as e: logger.error(f"Error during scraping: {str(e)}") await browser.close() raise HTTPException(status_code=500, detail=f"Scraping error: {str(e)}") except Exception as e: logger.error(f"Error launching browser: {str(e)}") raise HTTPException(status_code=500, detail=f"Browser launch error: {str(e)}") # @app.get("/search_leads") # async def search_leads( # query: str = Query(..., description="Search term for business leads") # ): # logger.info(f"Searching Google Maps for: {query}") # async with async_playwright() as p: # browser = await p.chromium.launch(headless=True) # page = await browser.new_page() # try: # # Go to Google Maps # await page.goto("https://www.google.com/maps", wait_until="networkidle") # # Accept cookies if present (optional, depends on region) # try: # await page.click('button[aria-label="Accept all"]', timeout=180000) # except: # pass # # Type the query in the search box and press Enter # await page.fill('input#searchboxinput', query) # await page.click('button#searchbox-searchbutton') # # Wait for search results to load - selector for listings container # await page.wait_for_selector('div[role="article"]', timeout=180000) # # Scroll results container to load more items (optional) # # For now, scrape the visible ones # # Extract data from listings # results = await page.evaluate(""" # () => { # const listings = []; # const elements = document.querySelectorAll('div[role="article"]'); # elements.forEach(el => { # const nameEl = el.querySelector('h3 span'); # const name = nameEl ? nameEl.innerText : null; # const addressEl = el.querySelector('[data-tooltip="Address"]'); # const address = addressEl ? addressEl.innerText : null; # const phoneEl = el.querySelector('button[data-tooltip="Copy phone number"]'); # const phone = phoneEl ? phoneEl.getAttribute('aria-label')?.replace('Copy phone number ', '') : null; # const websiteEl = el.querySelector('a[aria-label*="Website"]'); # const website = websiteEl ? websiteEl.href : null; # listings.push({name, address, phone, website}); # }); # return listings; # } # """) # await browser.close() # # Filter out empty entries # filtered = [r for r in results if r['name']] # return {"query": query, "results_count": len(filtered), "results": filtered} # except Exception as e: # await browser.close() # logger.error(f"Error during Google Maps search scraping: {str(e)}") # raise HTTPException(status_code=500, detail=f"Search scraping error: {str(e)}")