|
from fastapi import FastAPI, HTTPException, Query |
|
from playwright.async_api import async_playwright |
|
from urllib.parse import urlparse |
|
from typing import List, Set |
|
import re |
|
|
|
app = FastAPI() |
|
|
|
visited_links: Set[str] = set() |
|
|
|
|
|
email_pattern = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+") |
|
phone_pattern = re.compile(r"\+?\d[\d\s().-]{7,}\d") |
|
social_pattern = re.compile(r"https?://(?:www\.)?(?:facebook|linkedin|twitter|instagram)\.com/[^\s\"'<>]+") |
|
|
|
def extract_matches(pattern, text): |
|
return list(set(filter(lambda x: x and x.strip(), pattern.findall(text)))) |
|
|
|
async def extract_internal_links(page, base_url: str) -> List[str]: |
|
anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)') |
|
domain = urlparse(base_url).netloc |
|
internal_links = [ |
|
link for link in anchors |
|
if urlparse(link).netloc == domain and link not in visited_links |
|
] |
|
return list(set(internal_links)) |
|
|
|
async def scrape_contacts_from_page(page, url: str): |
|
contacts = {"emails": [], "phones": [], "socials": []} |
|
|
|
try: |
|
await page.goto(url, timeout=30000) |
|
await page.wait_for_timeout(1500) |
|
|
|
content = await page.content() |
|
|
|
contacts["emails"] = extract_matches(email_pattern, content) |
|
contacts["phones"] = extract_matches(phone_pattern, content) |
|
contacts["socials"] = extract_matches(social_pattern, content) |
|
|
|
except Exception as e: |
|
print(f"[!] Failed at {url}: {e}") |
|
|
|
return contacts |
|
|
|
@app.get("/scrape-contacts") |
|
async def scrape_contacts( |
|
website: str = Query(..., description="Base website URL"), |
|
max_depth: int = Query(1, description="How deep to crawl (recommended: 1 or 2)") |
|
): |
|
try: |
|
all_emails, all_phones, all_socials = set(), set(), set() |
|
visited_links.clear() |
|
|
|
async with async_playwright() as p: |
|
browser = await p.chromium.launch(headless=True) |
|
context = await browser.new_context() |
|
page = await context.new_page() |
|
|
|
queue = [(website, 0)] |
|
|
|
while queue: |
|
current_url, depth = queue.pop(0) |
|
if current_url in visited_links or depth > max_depth: |
|
continue |
|
visited_links.add(current_url) |
|
|
|
print(f"[+] Crawling: {current_url}") |
|
data = await scrape_contacts_from_page(page, current_url) |
|
all_emails.update(data["emails"]) |
|
all_phones.update(data["phones"]) |
|
all_socials.update(data["socials"]) |
|
|
|
if depth < max_depth: |
|
try: |
|
internal_links = await extract_internal_links(page, website) |
|
for link in internal_links: |
|
if any(x in link.lower() for x in ["contact", "about", "support"]): |
|
queue.append((link, depth + 1)) |
|
except Exception as e: |
|
print(f"[!] Link extraction failed at {current_url}: {e}") |
|
|
|
await browser.close() |
|
|
|
return { |
|
"website": website, |
|
"pages_visited": len(visited_links), |
|
"emails": list(all_emails), |
|
"phone_numbers": list(all_phones), |
|
"social_profiles": list(all_socials) |
|
} |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}") |
|
|