|
from fastapi import FastAPI, HTTPException, Query |
|
from playwright.async_api import async_playwright |
|
from urllib.parse import urljoin, urlparse |
|
from typing import List, Set |
|
import re |
|
|
|
app = FastAPI() |
|
|
|
|
|
js_cache: Set[str] = set() |
|
|
|
|
|
def extract_possible_endpoints(text: str) -> List[str]: |
|
pattern = re.compile(r'https?://[^\s"\'<>]+') |
|
urls = pattern.findall(text) |
|
return list(set([ |
|
url for url in urls if '/api/' in url or re.search(r'\.(json|php|xml|ajax|aspx|jsp)', url) |
|
])) |
|
|
|
|
|
async def extract_internal_links(page, base_url: str) -> List[str]: |
|
anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)') |
|
domain = urlparse(base_url).netloc |
|
internal_links = [ |
|
link for link in anchors |
|
if urlparse(link).netloc == domain |
|
] |
|
return list(set(internal_links)) |
|
|
|
|
|
async def scrape_page_for_endpoints(page, url: str) -> List[str]: |
|
found_endpoints = [] |
|
|
|
try: |
|
await page.goto(url, timeout=60000) |
|
await page.wait_for_timeout(2000) |
|
|
|
|
|
network_logs = [] |
|
|
|
def handle_request(req): |
|
network_logs.append(req.url) |
|
|
|
page.on("request", handle_request) |
|
|
|
|
|
js_urls = await page.eval_on_selector_all( |
|
'script[src]', |
|
"elements => elements.map(el => el.src)" |
|
) |
|
|
|
js_based_endpoints = [] |
|
|
|
for js_url in js_urls: |
|
if js_url in js_cache: |
|
continue |
|
js_cache.add(js_url) |
|
|
|
try: |
|
response = await page.request.get(js_url) |
|
if response.ok: |
|
body = await response.text() |
|
js_based_endpoints.extend(extract_possible_endpoints(body)) |
|
except: |
|
continue |
|
|
|
|
|
network_endpoints = extract_possible_endpoints('\n'.join(network_logs)) |
|
|
|
|
|
found_endpoints = list(set(js_based_endpoints + network_endpoints)) |
|
|
|
except Exception as e: |
|
print(f"[!] Failed to scrape {url}: {e}") |
|
|
|
return found_endpoints |
|
|
|
|
|
@app.get("/scrape-api-endpoints") |
|
async def scrape_api_endpoints( |
|
website: str = Query(..., description="Website URL to scrape"), |
|
max_depth: int = Query(1, description="Max depth of link crawling (1 = base page only)") |
|
): |
|
try: |
|
visited = set() |
|
all_endpoints = [] |
|
|
|
async with async_playwright() as p: |
|
browser = await p.chromium.launch(headless=True) |
|
context = await browser.new_context() |
|
page = await context.new_page() |
|
|
|
queue = [(website, 0)] |
|
|
|
while queue: |
|
current_url, depth = queue.pop(0) |
|
if current_url in visited or depth > max_depth: |
|
continue |
|
visited.add(current_url) |
|
|
|
print(f"[+] Scraping: {current_url}") |
|
endpoints = await scrape_page_for_endpoints(page, current_url) |
|
all_endpoints.extend(endpoints) |
|
|
|
if depth < max_depth: |
|
try: |
|
internal_links = await extract_internal_links(page, website) |
|
for link in internal_links: |
|
if link not in visited: |
|
queue.append((link, depth + 1)) |
|
except Exception as e: |
|
print(f"[!] Link extraction failed: {e}") |
|
|
|
await browser.close() |
|
|
|
return { |
|
"website": website, |
|
"pages_visited": len(visited), |
|
"total_endpoints_found": len(set(all_endpoints)), |
|
"api_endpoints": list(set(all_endpoints)), |
|
} |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}") |
|
|