|
import asyncio |
|
import os |
|
import re |
|
import json |
|
import time |
|
import zipfile |
|
from urllib.parse import urljoin, urlparse |
|
from typing import List, Dict, Any, Optional, Tuple, Set |
|
|
|
import requests |
|
import pandas as pd |
|
from bs4 import BeautifulSoup |
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
MAX_CONCURRENCY = 4 |
|
PLAYWRIGHT_WAIT_MS = 1500 |
|
FETCH_RETRIES = 2 |
|
SEARCH_PAGES = 2 |
|
RESULTS_PER_QUERY = 10 |
|
USER_AGENT = ( |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " |
|
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36" |
|
) |
|
|
|
|
|
|
|
|
|
def openai_extract_json(html: str, url: str, fields: List[str], api_key: Optional[str]) -> Optional[List[Dict[str, Any]]]: |
|
if not api_key: |
|
return None |
|
try: |
|
from openai import OpenAI |
|
client = OpenAI(api_key=api_key) |
|
field_hint = ", ".join(fields) if fields else "title, price, image, rating, url" |
|
system = ( |
|
"You are a robust web extractor. Given raw HTML and the page URL, " |
|
"return an array of JSON objects with fields you can infer (and the requested fields if present). " |
|
"Always output strictly valid JSON with double-quoted keys/strings. Include absolute image URLs if possible." |
|
) |
|
user = ( |
|
f"URL: {url}\n\n" |
|
f"Required fields to attempt: [{field_hint}]\n\n" |
|
"Return JSON array only. Do not include any commentary.\n\n" |
|
f"HTML:\n{html[:180000]}" |
|
) |
|
resp = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[{"role": "system", "content": system}, {"role": "user", "content": user}], |
|
temperature=0, |
|
) |
|
content = resp.choices[0].message.content.strip() |
|
content = re.sub(r"^```(?:json)?|```$", "", content).strip() |
|
data = json.loads(content) |
|
if isinstance(data, dict): |
|
data = [data] |
|
if isinstance(data, list): |
|
return data |
|
return None |
|
except Exception as e: |
|
print("OpenAI extraction failed:", e) |
|
return None |
|
|
|
|
|
|
|
|
|
async def _fetch_dom_once(url: str, wait_ms: int) -> str: |
|
from playwright.async_api import async_playwright |
|
async with async_playwright() as p: |
|
browser = await p.chromium.launch(headless=True) |
|
page = await browser.new_page(user_agent=USER_AGENT) |
|
await page.goto(url, wait_until="domcontentloaded", timeout=30000) |
|
try: |
|
await page.wait_for_load_state("networkidle", timeout=8000) |
|
except Exception: |
|
pass |
|
if wait_ms > 0: |
|
await asyncio.sleep(wait_ms / 1000) |
|
html = await page.content() |
|
await browser.close() |
|
return html |
|
|
|
async def fetch_dom(url: str, wait_ms: int = PLAYWRIGHT_WAIT_MS, retries: int = FETCH_RETRIES) -> str: |
|
last_err = None |
|
for attempt in range(1, retries + 2): |
|
try: |
|
return await _fetch_dom_once(url, wait_ms) |
|
except Exception as e: |
|
last_err = e |
|
await asyncio.sleep(0.6 * attempt) |
|
raise last_err |
|
|
|
|
|
|
|
|
|
def extract_images_and_items(html: str, base_url: str, card_selector: Optional[str] = None) -> Tuple[List[Dict[str, Any]], List[str]]: |
|
soup = BeautifulSoup(html, "html.parser") |
|
|
|
|
|
images = [] |
|
for img in soup.find_all("img"): |
|
src = img.get("src") or img.get("data-src") or img.get("data-original") |
|
if not src: |
|
continue |
|
abs_src = urljoin(base_url, src) |
|
images.append(abs_src) |
|
|
|
|
|
items = [] |
|
if card_selector: |
|
candidates = soup.select(card_selector) |
|
else: |
|
candidates = soup.select( |
|
"div.product, li.product, div.card, article, div.product-item, " |
|
"div.s-result-item, div._1AtVbE, div._4ddWXP, div.MuiCard-root, " |
|
"section, li.grid-item" |
|
) |
|
if not candidates: |
|
candidates = [a.parent for a in soup.select("a img") if a.parent] |
|
|
|
for c in candidates: |
|
try: |
|
title = None |
|
for sel in ["h1", "h2", "h3", ".title", ".product-title", "._4rR01T", ".s1Q9rs", "a[title]"]: |
|
n = c.select_one(sel) |
|
if n and n.get_text(strip=True): |
|
title = n.get_text(strip=True) |
|
break |
|
if not title: |
|
img = c.find("img") |
|
if img and img.get("alt"): |
|
title = img.get("alt").strip() |
|
|
|
price = None |
|
price_text = c.get_text(" ", strip=True) |
|
m = re.search(r"(?:βΉ|Rs\.?|INR|\$|β¬|Β£)\s?\d[\d,]*(?:\.\d+)?", price_text) |
|
if m: |
|
price = m.group(0) |
|
|
|
link = c.find("a") |
|
href = urljoin(base_url, link.get("href")) if link and link.get("href") else base_url |
|
|
|
img = c.find("img") |
|
img_src = None |
|
if img: |
|
img_src = img.get("src") or img.get("data-src") or img.get("data-original") |
|
if img_src: |
|
img_src = urljoin(base_url, img_src) |
|
|
|
if any([title, price, img_src]): |
|
items.append({"title": title, "price": price, "url": href, "image": img_src}) |
|
except Exception: |
|
continue |
|
|
|
|
|
seen = set() |
|
unique_images = [] |
|
for u in images: |
|
if u not in seen: |
|
seen.add(u) |
|
unique_images.append(u) |
|
|
|
return items, unique_images |
|
|
|
|
|
|
|
|
|
def download_images(image_urls: List[str], out_dir: str) -> List[str]: |
|
os.makedirs(out_dir, exist_ok=True) |
|
saved = [] |
|
s = requests.Session() |
|
s.headers.update({"User-Agent": USER_AGENT}) |
|
for u in image_urls: |
|
try: |
|
name = os.path.basename(urlparse(u).path) or f"img_{len(saved)+1}.jpg" |
|
if not os.path.splitext(name)[1]: |
|
name += ".jpg" |
|
path = os.path.join(out_dir, name) |
|
r = s.get(u, timeout=20) |
|
if r.status_code == 200 and r.content: |
|
with open(path, "wb") as f: |
|
f.write(r.content) |
|
saved.append(path) |
|
except Exception as e: |
|
print("Image download failed:", u, e) |
|
return saved |
|
|
|
def caption_images(paths: List[str]) -> Dict[str, str]: |
|
try: |
|
from transformers import BlipProcessor, BlipForConditionalGeneration |
|
from PIL import Image |
|
import torch |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") |
|
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device) |
|
|
|
captions = {} |
|
for p in paths: |
|
try: |
|
im = Image.open(p).convert("RGB") |
|
inputs = processor(im, return_tensors="pt").to(device) |
|
out = model.generate(**inputs, max_new_tokens=40) |
|
text = processor.decode(out[0], skip_special_tokens=True) |
|
captions[p] = text |
|
except Exception as e: |
|
captions[p] = f"(caption failed: {e})" |
|
return captions |
|
except Exception as e: |
|
print("Captioning unavailable:", e) |
|
return {} |
|
|
|
|
|
|
|
|
|
def zip_paths(paths: List[str], zip_path: str) -> str: |
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: |
|
for p in paths: |
|
if os.path.isfile(p): |
|
zf.write(p, arcname=os.path.basename(p)) |
|
return zip_path |
|
|
|
|
|
|
|
|
|
ADS_PRESETS = [ |
|
|
|
"site:adsoftheworld.com", |
|
"site:theinspiration.com", |
|
"site:ads-of-the-world.s3", |
|
"site:behance.net ad campaign", |
|
"site:dribbble.com case study ad", |
|
] |
|
|
|
NEWS_SIGNAL = [ |
|
"site:news.ycombinator.com", "site:techcrunch.com", "site:theverge.com", |
|
"site:adage.com", "site:campaignlive.com" |
|
] |
|
|
|
def build_queries_from_prompt(prompt: str, include_ads_sources: bool) -> List[str]: |
|
|
|
base = re.sub(r"[^a-zA-Z0-9\s:+\-_/\.]", " ", prompt).strip() |
|
base = re.sub(r"\s+", " ", base) |
|
|
|
core_variants = [ |
|
base, |
|
f'{base} best examples', |
|
f'{base} recent campaigns', |
|
f'{base} case study', |
|
f'{base} images', |
|
] |
|
|
|
queries = [] |
|
for v in core_variants: |
|
queries.append(v) |
|
|
|
for ns in NEWS_SIGNAL[:2]: |
|
queries.append(f"{v} {ns}") |
|
|
|
if include_ads_sources: |
|
for v in core_variants: |
|
for siteq in ADS_PRESETS: |
|
queries.append(f"{v} {siteq}") |
|
|
|
|
|
seen = set() |
|
uniq = [] |
|
for q in queries: |
|
if q not in seen: |
|
seen.add(q) |
|
uniq.append(q) |
|
return uniq[:12] |
|
|
|
def ddg_search(query: str, pages: int = 1) -> List[Tuple[str, str]]: |
|
""" |
|
Returns list of (title, url) from DuckDuckGo HTML results, across pages. |
|
""" |
|
results = [] |
|
session = requests.Session() |
|
session.headers.update({"User-Agent": USER_AGENT}) |
|
|
|
for page in range(pages): |
|
params = {"q": query} |
|
if page > 0: |
|
params["s"] = str(page * 50) |
|
r = session.get("https://duckduckgo.com/html/", params=params, timeout=20) |
|
soup = BeautifulSoup(r.text, "html.parser") |
|
for res in soup.select(".result"): |
|
a = res.select_one(".result__a") |
|
if not a: |
|
continue |
|
title = a.get_text(strip=True) |
|
href = a.get("href") |
|
if not href: |
|
continue |
|
results.append((title, href)) |
|
return results |
|
|
|
def pick_best_links(all_results: List[Tuple[str, str]], want: int = 10) -> List[str]: |
|
""" |
|
Simple pragmatic ranking: |
|
- de-duplicate by URL & domain |
|
- prefer diverse domains |
|
""" |
|
picked = [] |
|
seen_urls: Set[str] = set() |
|
seen_domains: Set[str] = set() |
|
|
|
for _, url in all_results: |
|
u = url.strip() |
|
if not u or u in seen_urls: |
|
continue |
|
dom = urlparse(u).netloc.lower() |
|
if dom.startswith("www."): |
|
dom = dom[4:] |
|
|
|
if dom in {"duckduckgo.com"}: |
|
continue |
|
if dom in seen_domains and len(picked) < want // 2: |
|
|
|
continue |
|
|
|
seen_urls.add(u) |
|
seen_domains.add(dom) |
|
picked.append(u) |
|
if len(picked) >= want: |
|
break |
|
return picked |
|
|
|
def search_links_from_prompt(prompt: str, include_ads_sources: bool, per_query: int, pages: int) -> List[str]: |
|
queries = build_queries_from_prompt(prompt, include_ads_sources) |
|
all_results: List[Tuple[str, str]] = [] |
|
for q in queries: |
|
try: |
|
res = ddg_search(q, pages=pages) |
|
|
|
all_results.extend(res[:per_query]) |
|
except Exception as e: |
|
print("Search failed for query:", q, e) |
|
continue |
|
|
|
best = pick_best_links(all_results, want=max(5, per_query * 2)) |
|
return best |
|
|
|
|
|
|
|
|
|
async def scrape_one(url: str, fields: List[str], use_llm: bool, api_key: Optional[str], |
|
card_selector: Optional[str], log: List[str], sem: asyncio.Semaphore) -> Dict[str, Any]: |
|
async with sem: |
|
try: |
|
html = await fetch_dom(url) |
|
except Exception as e: |
|
log.append(f"[ERROR] Failed to load: {url} -> {e}") |
|
return {"url": url, "html": "", "items": [], "images": [], "llm_rows": []} |
|
|
|
items, images = [], [] |
|
try: |
|
items, images = extract_images_and_items(html, url, card_selector) |
|
except Exception as e: |
|
log.append(f"[WARN] Parse issue on: {url} -> {e}") |
|
|
|
llm_rows = [] |
|
if use_llm: |
|
try: |
|
llm_rows = openai_extract_json(html, url, fields, api_key) or [] |
|
except Exception as e: |
|
log.append(f"[WARN] LLM extraction failed: {url} -> {e}") |
|
|
|
return {"url": url, "html": html, "items": items, "images": images, "llm_rows": llm_rows} |
|
|
|
def to_dataframe(rows: List[Dict[str, Any]]) -> pd.DataFrame: |
|
if not rows: |
|
return pd.DataFrame() |
|
all_keys = set() |
|
for r in rows: |
|
all_keys.update(r.keys()) |
|
ordered = [] |
|
for r in rows: |
|
d = {k: r.get(k) for k in all_keys} |
|
ordered.append(d) |
|
df = pd.DataFrame(ordered) |
|
preferred = [k for k in ["title", "name", "price", "rating", "image", "url"] if k in df.columns] |
|
others = [c for c in df.columns if c not in preferred] |
|
df = df[preferred + others] |
|
return df |
|
|
|
|
|
|
|
|
|
def run_scrape(input_mode: str, |
|
prompt_or_urls: str, |
|
fields_text: str, |
|
card_selector: str, |
|
include_ads_sources: bool, |
|
per_query_results: int, |
|
search_pages: int, |
|
use_llm: bool, |
|
api_key: str, |
|
download_imgs: bool, |
|
do_caption: bool): |
|
start = time.time() |
|
log: List[str] = [] |
|
|
|
|
|
if input_mode == "Prompt": |
|
if not prompt_or_urls.strip(): |
|
return pd.DataFrame(), [], None, None, None, "Enter a prompt.", "No prompt given." |
|
log.append(f"[INFO] Building queries from prompt: {prompt_or_urls!r}") |
|
urls = search_links_from_prompt( |
|
prompt_or_urls.strip(), |
|
include_ads_sources=include_ads_sources, |
|
per_query=per_query_results, |
|
pages=max(1, search_pages) |
|
) |
|
if not urls: |
|
return pd.DataFrame(), [], None, None, None, "No links found.", "\n".join(log) |
|
log.append(f"[INFO] Selected {len(urls)} links from search.") |
|
else: |
|
urls = [u.strip() for u in prompt_or_urls.splitlines() if u.strip()] |
|
if not urls: |
|
return pd.DataFrame(), [], None, None, None, "Enter at least one URL.", "No URLs supplied." |
|
log.append(f"[INFO] Using {len(urls)} direct URL(s).") |
|
|
|
fields = [f.strip() for f in fields_text.split(',')] if fields_text.strip() else [] |
|
|
|
out_dir = os.path.abspath("scrape_output") |
|
os.makedirs(out_dir, exist_ok=True) |
|
|
|
|
|
sem = asyncio.Semaphore(MAX_CONCURRENCY) |
|
|
|
async def gather_all(): |
|
tasks = [ |
|
scrape_one(u, fields, use_llm, api_key if use_llm else None, card_selector or None, log, sem) |
|
for u in urls |
|
] |
|
return await asyncio.gather(*tasks) |
|
|
|
try: |
|
scraped = asyncio.run(gather_all()) |
|
except RuntimeError: |
|
scraped = asyncio.get_event_loop().run_until_complete(gather_all()) |
|
except Exception as e: |
|
log.append(f"[FATAL] Async run failed: {e}") |
|
return pd.DataFrame(), [], None, None, None, "Run failed.", "\n".join(log) |
|
|
|
heuristic_rows: List[Dict[str, Any]] = [] |
|
llm_rows: List[Dict[str, Any]] = [] |
|
all_images: List[str] = [] |
|
|
|
for s in scraped: |
|
if not isinstance(s, dict): |
|
continue |
|
heuristic_rows.extend(s.get("items", [])) |
|
llm_rows.extend(s.get("llm_rows", [])) |
|
all_images.extend(s.get("images", [])) |
|
|
|
|
|
rows = llm_rows if use_llm and llm_rows else heuristic_rows |
|
df = to_dataframe(rows) |
|
|
|
ts = int(time.time()) |
|
json_path = os.path.join(out_dir, f"scrape_{ts}.json") |
|
csv_path = os.path.join(out_dir, f"scrape_{ts}.csv") |
|
try: |
|
df.to_csv(csv_path, index=False) |
|
with open(json_path, "w", encoding="utf-8") as f: |
|
json.dump(rows, f, ensure_ascii=False, indent=2) |
|
except Exception as e: |
|
log.append(f"[WARN] Failed to save CSV/JSON: {e}") |
|
json_path = None |
|
csv_path = None |
|
|
|
gallery_paths, zip_path = [], None |
|
if download_imgs and all_images: |
|
try: |
|
img_dir = os.path.join(out_dir, f"images_{ts}") |
|
saved = download_images(all_images, img_dir) |
|
gallery_paths = saved[:120] |
|
if do_caption and saved: |
|
try: |
|
captions_map = caption_images(saved) |
|
if not df.empty: |
|
img_col = None |
|
for c in df.columns: |
|
if c.lower() in ("image", "image_url", "img", "imageurl"): |
|
img_col = c |
|
break |
|
if img_col: |
|
def _map_caption(u): |
|
if not u: |
|
return "" |
|
fname = os.path.basename(urlparse(str(u)).path) |
|
return captions_map.get(os.path.join(img_dir, fname), "") |
|
df["caption"] = df[img_col].map(_map_caption) |
|
df.to_csv(csv_path, index=False) |
|
with open(json_path, "w", encoding="utf-8") as f: |
|
json.dump(json.loads(df.to_json(orient="records")), f, ensure_ascii=False, indent=2) |
|
except Exception as e: |
|
log.append(f"[WARN] Captioning failed: {e}") |
|
|
|
zip_path = os.path.join(out_dir, f"images_{ts}.zip") |
|
try: |
|
zip_paths(saved, zip_path) |
|
except Exception as e: |
|
log.append(f"[WARN] ZIP failed: {e}") |
|
zip_path = None |
|
except Exception as e: |
|
log.append(f"[WARN] Image pipeline failed: {e}") |
|
|
|
elapsed = round(time.time() - start, 2) |
|
gallery_data = [(p, os.path.basename(p)) for p in gallery_paths] |
|
status = f"Scraped {len(urls)} URL(s) β’ Rows: {len(df)} β’ Images found: {len(all_images)} β’ Time: {elapsed}s" |
|
return df, gallery_data, (json_path if json_path and os.path.isfile(json_path) else None), \ |
|
(csv_path if csv_path and os.path.isfile(csv_path) else None), \ |
|
(zip_path if zip_path and os.path.isfile(zip_path) else None), \ |
|
status, "\n".join(log) if log else "OK" |
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="AI Scraper β Prompt β Best Links β Text+Images", css=".gradio-container {max-width: 1200px !important}") as demo: |
|
gr.Markdown(""" |
|
# π·οΈ AI-Powered Prompt Scraper (2025) |
|
- Give a **prompt** (e.g., "Gen Z pink organic skincare ad campaign in India 2024") |
|
β we search smartly, pick strong links (optionally ad archives), and scrape **text + images** |
|
- Or switch to **Direct URLs** mode and paste URLs. |
|
- Optional **LLM semantic parsing** to structured JSON. |
|
""") |
|
|
|
with gr.Row(): |
|
input_mode = gr.Radio(choices=["Prompt", "Direct URLs"], value="Prompt", label="Input Mode") |
|
|
|
with gr.Row(): |
|
prompt_or_urls = gr.Textbox( |
|
label="Prompt (or URLs if in Direct mode)", |
|
placeholder="e.g., gen z pink skincare ad campaign india 2024" |
|
) |
|
|
|
with gr.Row(): |
|
fields = gr.Textbox(label="Fields to extract (comma-separated)", placeholder="title, price, image, rating, url") |
|
card_selector = gr.Textbox(label="Optional CSS selector for item cards", placeholder="div.product, article, .card") |
|
|
|
with gr.Row(): |
|
include_ads_sources = gr.Checkbox(label="Bias search towards ad archives/sources", value=True) |
|
per_query_results = gr.Slider(1, 15, value=6, step=1, label="Top results to keep per query") |
|
search_pages = gr.Slider(1, 3, value=2, step=1, label="Search pages per query (DDG)") |
|
|
|
with gr.Row(): |
|
use_llm = gr.Checkbox(label="Use OpenAI for semantic extraction", value=False) |
|
api_key = gr.Textbox(label="OpenAI API Key (if using LLM)", type="password") |
|
|
|
with gr.Row(): |
|
download_imgs = gr.Checkbox(label="Download images", value=True) |
|
do_caption = gr.Checkbox(label="Caption images (slow)", value=False) |
|
|
|
run_btn = gr.Button("π Run Scraper", variant="primary") |
|
|
|
with gr.Row(): |
|
table = gr.Dataframe(label="Extracted Data (preview)", interactive=False) |
|
gallery = gr.Gallery(label="Scraped Images (subset)", show_label=True, height=420, allow_preview=True) |
|
|
|
with gr.Row(): |
|
json_file = gr.File(label="Download JSON") |
|
csv_file = gr.File(label="Download CSV") |
|
zip_file = gr.File(label="Download Images ZIP") |
|
|
|
status = gr.Markdown("Ready.") |
|
logs = gr.Textbox(label="Run Logs", lines=10) |
|
|
|
run_btn.click( |
|
fn=run_scrape, |
|
inputs=[ |
|
input_mode, prompt_or_urls, fields, card_selector, |
|
include_ads_sources, per_query_results, search_pages, |
|
use_llm, api_key, download_imgs, do_caption |
|
], |
|
outputs=[table, gallery, json_file, csv_file, zip_file, status, logs] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |