from fastapi import FastAPI, HTTPException, Query from playwright.async_api import async_playwright from urllib.parse import urlparse from typing import List, Set import re app = FastAPI() visited_links: Set[str] = set() # Improved regex patterns email_pattern = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+") phone_pattern = re.compile(r"\+?\d[\d\s().-]{7,}\d") social_pattern = re.compile(r"https?://(?:www\.)?(?:facebook|linkedin|twitter|instagram)\.com/[^\s\"'<>]+") def extract_matches(pattern, text): return list(set(filter(lambda x: x and x.strip(), pattern.findall(text)))) async def extract_internal_links(page, base_url: str) -> List[str]: anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)') domain = urlparse(base_url).netloc internal_links = [ link for link in anchors if urlparse(link).netloc == domain and link not in visited_links ] return list(set(internal_links)) async def scrape_contacts_from_page(page, url: str): contacts = {"emails": [], "phones": [], "socials": []} try: await page.goto(url, timeout=30000) await page.wait_for_timeout(1500) content = await page.content() contacts["emails"] = extract_matches(email_pattern, content) contacts["phones"] = extract_matches(phone_pattern, content) contacts["socials"] = extract_matches(social_pattern, content) except Exception as e: print(f"[!] Failed at {url}: {e}") return contacts @app.get("/scrape-contacts") async def scrape_contacts( website: str = Query(..., description="Base website URL"), max_depth: int = Query(1, description="How deep to crawl (recommended: 1 or 2)") ): try: all_emails, all_phones, all_socials = set(), set(), set() visited_links.clear() async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context() page = await context.new_page() queue = [(website, 0)] while queue: current_url, depth = queue.pop(0) if current_url in visited_links or depth > max_depth: continue visited_links.add(current_url) print(f"[+] Crawling: {current_url}") data = await scrape_contacts_from_page(page, current_url) all_emails.update(data["emails"]) all_phones.update(data["phones"]) all_socials.update(data["socials"]) if depth < max_depth: try: internal_links = await extract_internal_links(page, website) for link in internal_links: if any(x in link.lower() for x in ["contact", "about", "support"]): queue.append((link, depth + 1)) except Exception as e: print(f"[!] Link extraction failed at {current_url}: {e}") await browser.close() return { "website": website, "pages_visited": len(visited_links), "emails": list(all_emails), "phone_numbers": list(all_phones), "social_profiles": list(all_socials) } except Exception as e: raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")