|
|
|
import time
|
|
import logging
|
|
import argparse
|
|
import os
|
|
import json
|
|
import random
|
|
import re
|
|
import uuid
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any, Optional, Union, Tuple
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import (
|
|
TimeoutException, NoSuchElementException, WebDriverException
|
|
)
|
|
import gradio as gr
|
|
import pandas as pd
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
ADVERTISERS = [
|
|
{"id": "AR10051102910143528961", "name": "Theory Sabers"},
|
|
{"id": "AR12645693856247971841", "name": "Artsabers"},
|
|
{"id": "AR07257050693515608065", "name": "bmlightsabers"},
|
|
{"id": "AR01506694249926623233", "name": "Padawan Outpost Ltd"},
|
|
{"id": "AR10584025853845307393", "name": "GalaxySabers"},
|
|
{"id": "AR16067963414479110145", "name": "nsabers"},
|
|
{"id": "AR12875519274243850241", "name": "es-sabers"},
|
|
{"id": "AR05144647067079016449", "name": "Ultra Sabers"},
|
|
{"id": "AR15581800501283389441", "name": "SuperNeox"},
|
|
{"id": "AR06148907109187584001", "name": "Sabertrio"}
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FB_DEFAULT_TIMEOUT = 60
|
|
FB_MIN_WAIT_TIME = 1
|
|
FB_MAX_WAIT_TIME = 3
|
|
FB_MAX_SCROLL_ATTEMPTS = 5
|
|
FB_SELECTOR_HISTORY_FILE = "fb_selector_stats.json"
|
|
|
|
|
|
USER_AGENTS = [
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/118.0"
|
|
]
|
|
|
|
|
|
VIEWPORT_SIZES = [
|
|
(1366, 768),
|
|
(1920, 1080),
|
|
(1536, 864),
|
|
(1440, 900)
|
|
]
|
|
|
|
|
|
class SelectorStats:
|
|
"""Class to track and optimize selector performance"""
|
|
|
|
def __init__(self, file_path=FB_SELECTOR_HISTORY_FILE):
|
|
self.file_path = file_path
|
|
self.stats = self._load_stats()
|
|
|
|
def _load_stats(self) -> Dict:
|
|
"""Load stats from file or initialize if not exists"""
|
|
if os.path.exists(self.file_path):
|
|
try:
|
|
with open(self.file_path, 'r') as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
logger.warning(f"Error loading selector stats: {e}, initializing new stats")
|
|
|
|
|
|
return {
|
|
"facebook": {"selectors": {}, "last_updated": datetime.now().isoformat()}
|
|
}
|
|
|
|
def update_selector_success(self, selector: str, count: int = 1) -> None:
|
|
"""Record successful use of a selector"""
|
|
platform = "facebook"
|
|
if platform not in self.stats:
|
|
self.stats[platform] = {"selectors": {}, "last_updated": datetime.now().isoformat()}
|
|
|
|
if selector not in self.stats[platform]["selectors"]:
|
|
self.stats[platform]["selectors"][selector] = {"successes": 0, "attempts": 0}
|
|
|
|
self.stats[platform]["selectors"][selector]["successes"] += count
|
|
self.stats[platform]["selectors"][selector]["attempts"] += 1
|
|
self.stats[platform]["last_updated"] = datetime.now().isoformat()
|
|
|
|
|
|
self._save_stats()
|
|
|
|
def update_selector_attempt(self, selector: str) -> None:
|
|
"""Record attempt to use a selector regardless of success"""
|
|
platform = "facebook"
|
|
if platform not in self.stats:
|
|
self.stats[platform] = {"selectors": {}, "last_updated": datetime.now().isoformat()}
|
|
|
|
if selector not in self.stats[platform]["selectors"]:
|
|
self.stats[platform]["selectors"][selector] = {"successes": 0, "attempts": 0}
|
|
|
|
self.stats[platform]["selectors"][selector]["attempts"] += 1
|
|
self.stats[platform]["last_updated"] = datetime.now().isoformat()
|
|
|
|
|
|
|
|
def get_best_selectors(self, min_attempts: int = 3, max_count: int = 10) -> List[str]:
|
|
"""Get the best performing selectors for Facebook"""
|
|
platform = "facebook"
|
|
if platform not in self.stats:
|
|
return []
|
|
|
|
selectors = []
|
|
for selector, data in self.stats[platform]["selectors"].items():
|
|
if data["attempts"] >= min_attempts:
|
|
success_rate = data["successes"] / data["attempts"] if data["attempts"] > 0 else 0
|
|
selectors.append((selector, success_rate))
|
|
|
|
|
|
selectors.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
|
|
return [s[0] for s in selectors[:max_count]]
|
|
|
|
def _save_stats(self) -> None:
|
|
"""Save stats to file"""
|
|
try:
|
|
with open(self.file_path, 'w') as f:
|
|
json.dump(self.stats, f, indent=2)
|
|
except IOError as e:
|
|
logger.error(f"Error saving selector stats: {e}")
|
|
|
|
|
|
class FacebookAdsScraper:
|
|
def __init__(self, headless=True, debug_mode=False):
|
|
"""Initialize the ads scraper with browser configuration"""
|
|
self.debug_mode = debug_mode
|
|
self.headless = headless
|
|
self.driver = self._setup_driver(headless)
|
|
|
|
self.selector_stats = SelectorStats()
|
|
|
|
self.navigation_history = []
|
|
|
|
self.success_rate = defaultdict(lambda: {"success": 0, "failure": 0})
|
|
|
|
self.session_id = str(uuid.uuid4())[:8]
|
|
|
|
def _setup_driver(self, headless):
|
|
"""Set up and configure the Chrome WebDriver with anti-detection measures"""
|
|
chrome_options = Options()
|
|
if headless:
|
|
chrome_options.add_argument("--headless")
|
|
|
|
|
|
user_agent = random.choice(USER_AGENTS)
|
|
chrome_options.add_argument(f"--user-agent={user_agent}")
|
|
logger.info(f"Using user agent: {user_agent}")
|
|
|
|
|
|
viewport_width, viewport_height = random.choice(VIEWPORT_SIZES)
|
|
chrome_options.add_argument(f"--window-size={viewport_width},{viewport_height}")
|
|
logger.info(f"Using viewport size: {viewport_width}x{viewport_height}")
|
|
|
|
|
|
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("--disable-gpu")
|
|
chrome_options.add_argument("--start-maximized")
|
|
chrome_options.add_argument("--enable-unsafe-swiftshader")
|
|
|
|
|
|
chrome_options.add_argument("--disable-extensions")
|
|
chrome_options.add_argument("--disable-notifications")
|
|
chrome_options.add_argument("--blink-settings=imagesEnabled=true")
|
|
|
|
|
|
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
|
chrome_options.add_experimental_option("useAutomationExtension", False)
|
|
|
|
|
|
chrome_options.add_experimental_option("prefs", {
|
|
"profile.default_content_setting_values.notifications": 2,
|
|
"profile.managed_default_content_settings.images": 1,
|
|
"profile.managed_default_content_settings.cookies": 1,
|
|
|
|
"profile.default_content_setting_values.plugins": random.randint(1, 3),
|
|
"profile.default_content_setting_values.popups": random.randint(1, 2)
|
|
})
|
|
|
|
try:
|
|
|
|
service = Service()
|
|
driver = webdriver.Chrome(service=service, options=chrome_options)
|
|
|
|
|
|
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
|
|
"source": """
|
|
Object.defineProperty(navigator, 'webdriver', {
|
|
get: () => undefined
|
|
});
|
|
|
|
// Overwrite the languages with random order
|
|
Object.defineProperty(navigator, 'languages', {
|
|
get: () => ['en-US', 'en', 'de'].sort(() => 0.5 - Math.random())
|
|
});
|
|
|
|
// Modify plugins length
|
|
Object.defineProperty(navigator, 'plugins', {
|
|
get: () => {
|
|
// Randomize plugins length between 3 and 7
|
|
const len = Math.floor(Math.random() * 5) + 3;
|
|
const plugins = { length: len };
|
|
for (let i = 0; i < len; i++) {
|
|
plugins[i] = {
|
|
name: ['Flash', 'Chrome PDF Plugin', 'Native Client', 'Chrome PDF Viewer'][Math.floor(Math.random() * 4)],
|
|
filename: ['internal-pdf-viewer', 'mhjfbmdgcfjbbpaeojofohoefgiehjai', 'internal-nacl-plugin'][Math.floor(Math.random() * 3)]
|
|
};
|
|
}
|
|
return plugins;
|
|
}
|
|
});
|
|
"""
|
|
})
|
|
except TypeError:
|
|
|
|
driver = webdriver.Chrome(options=chrome_options)
|
|
except Exception as e:
|
|
|
|
logger.warning(f"CDP command failed, continuing: {e}")
|
|
driver = webdriver.Chrome(options=chrome_options)
|
|
|
|
|
|
driver.set_page_load_timeout(FB_DEFAULT_TIMEOUT)
|
|
return driver
|
|
|
|
def random_wait(self, min_time=None, max_time=None):
|
|
"""Wait for a random amount of time to simulate human behavior"""
|
|
min_time = min_time or FB_MIN_WAIT_TIME
|
|
max_time = max_time or FB_MAX_WAIT_TIME
|
|
wait_time = random.uniform(min_time, max_time)
|
|
time.sleep(wait_time)
|
|
return wait_time
|
|
|
|
def human_like_scroll(self, scroll_attempts=None):
|
|
"""Scroll down the page in a human-like way"""
|
|
attempts = scroll_attempts or random.randint(3, FB_MAX_SCROLL_ATTEMPTS)
|
|
|
|
|
|
initial_height = self.driver.execute_script("return document.body.scrollHeight")
|
|
|
|
for i in range(attempts):
|
|
|
|
scroll_percent = random.uniform(0.25, 0.9)
|
|
viewport_height = self.driver.execute_script("return window.innerHeight")
|
|
scroll_amount = int(viewport_height * scroll_percent)
|
|
|
|
|
|
scroll_steps = random.randint(5, 15)
|
|
current_position = self.driver.execute_script("return window.pageYOffset")
|
|
target_position = current_position + scroll_amount
|
|
|
|
for step in range(scroll_steps):
|
|
|
|
t = (step + 1) / scroll_steps
|
|
|
|
factor = t * t * (3.0 - 2.0 * t)
|
|
next_position = current_position + (target_position - current_position) * factor
|
|
self.driver.execute_script(f"window.scrollTo(0, {next_position})")
|
|
time.sleep(random.uniform(0.01, 0.05))
|
|
|
|
|
|
if random.random() < 0.3:
|
|
self.random_wait(1.5, 3.5)
|
|
else:
|
|
self.random_wait(0.5, 1.5)
|
|
|
|
|
|
logger.info(f"Human-like scroll {i + 1}/{attempts} completed")
|
|
|
|
|
|
new_height = self.driver.execute_script("return document.body.scrollHeight")
|
|
if new_height == initial_height and i > 1:
|
|
|
|
|
|
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight)")
|
|
self.random_wait()
|
|
initial_height = new_height
|
|
|
|
def simulate_human_behavior(self):
|
|
"""Simulate random human-like interactions with the page"""
|
|
|
|
if random.random() < 0.7:
|
|
try:
|
|
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, "a, button, input, div")
|
|
if elements:
|
|
element = random.choice(elements)
|
|
ActionChains(self.driver).move_to_element(element).perform()
|
|
self.random_wait(0.2, 1.0)
|
|
except:
|
|
|
|
pass
|
|
|
|
|
|
if random.random() < 0.2:
|
|
try:
|
|
|
|
safe_elements = self.driver.find_elements(By.CSS_SELECTOR, "p, h1, h2, h3, h4, span")
|
|
if safe_elements:
|
|
safe_element = random.choice(safe_elements)
|
|
ActionChains(self.driver).move_to_element(safe_element).click().perform()
|
|
self.random_wait(0.2, 1.0)
|
|
except:
|
|
|
|
pass
|
|
|
|
def check_headless_visibility(self):
|
|
"""
|
|
Check if elements are visible in headless mode
|
|
Returns True if everything is working properly
|
|
"""
|
|
if not self.headless:
|
|
|
|
return True
|
|
|
|
logger.info("Performing headless visibility check...")
|
|
|
|
|
|
test_url = "https://www.example.com"
|
|
try:
|
|
self.driver.get(test_url)
|
|
|
|
|
|
WebDriverWait(self.driver, 10).until(
|
|
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
|
)
|
|
|
|
logger.info("Headless check passed: Page loaded successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Headless check failed: {e}")
|
|
|
|
|
|
logger.info("Switching to non-headless mode...")
|
|
self.driver.quit()
|
|
self.headless = False
|
|
self.driver = self._setup_driver(headless=False)
|
|
|
|
return True
|
|
|
|
def fetch_facebook_ads(self, query):
|
|
"""Fetch ads from Facebook's Ad Library with anti-detection measures"""
|
|
ads_data = []
|
|
base_url = "https://www.facebook.com/ads/library/"
|
|
|
|
logger.info(f"Fetching Facebook ads for {query}")
|
|
|
|
try:
|
|
|
|
params = {
|
|
"active_status": "all",
|
|
"ad_type": "all",
|
|
"country": "ALL",
|
|
"q": query,
|
|
|
|
"_": int(time.time() * 1000),
|
|
"session_id": self.session_id
|
|
}
|
|
|
|
|
|
url = base_url + "?" + "&".join(f"{k}={v}" for k, v in params.items())
|
|
logger.info(f"Navigating to Facebook URL: {url}")
|
|
|
|
|
|
self.driver.get(url)
|
|
|
|
|
|
try:
|
|
WebDriverWait(self.driver, FB_DEFAULT_TIMEOUT).until(
|
|
EC.any_of(
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, "div[role='main']")),
|
|
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
|
)
|
|
)
|
|
except TimeoutException:
|
|
logger.warning("Timeout waiting for Facebook page to load initially, continuing anyway")
|
|
|
|
|
|
self.human_like_scroll()
|
|
|
|
|
|
self.simulate_human_behavior()
|
|
|
|
|
|
if self.debug_mode:
|
|
self._save_debug_data("facebook_after_scroll", query)
|
|
|
|
|
|
ad_elements = self._find_facebook_ad_elements()
|
|
|
|
if not ad_elements:
|
|
logger.info("No Facebook ads found")
|
|
|
|
if self.debug_mode:
|
|
self._save_debug_data("facebook_no_ads", query)
|
|
|
|
|
|
return self._generate_placeholder_facebook_data(query)
|
|
|
|
|
|
for i, ad in enumerate(ad_elements[:10]):
|
|
try:
|
|
ad_data = {
|
|
"platform": "Facebook",
|
|
"query": query,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"index": i + 1,
|
|
"session_id": self.session_id
|
|
}
|
|
|
|
|
|
full_text = ad.text.strip()
|
|
|
|
|
|
if i == 0:
|
|
logger.info(f"First Facebook ad full text (first 150 chars): {full_text[:150]}...")
|
|
|
|
|
|
extracted_data = self._extract_facebook_ad_data(ad, full_text)
|
|
|
|
|
|
ad_data.update(extracted_data)
|
|
|
|
|
|
if "advertiser" not in ad_data or not ad_data["advertiser"]:
|
|
ad_data["advertiser"] = "Unknown Advertiser"
|
|
if "text" not in ad_data or not ad_data["text"]:
|
|
ad_data["text"] = "Ad content not available"
|
|
|
|
ads_data.append(ad_data)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error processing Facebook ad {i + 1}: {e}")
|
|
|
|
return ads_data if ads_data else self._generate_placeholder_facebook_data(query)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching Facebook ads: {e}")
|
|
|
|
if self.debug_mode:
|
|
self._save_debug_data("facebook_error", query)
|
|
|
|
return self._generate_placeholder_facebook_data(query)
|
|
|
|
def _find_facebook_ad_elements(self):
|
|
"""Find Facebook ad elements using a self-healing selector strategy"""
|
|
|
|
historical_best = self.selector_stats.get_best_selectors()
|
|
|
|
|
|
base_selectors = [
|
|
"div[class*='_7jvw']",
|
|
"div[data-testid='ad_library_card']",
|
|
"div[class*='AdLibraryCard']",
|
|
"div.AdLibraryCard",
|
|
"div[class*='adCard']",
|
|
"div[class*='ad_card']"
|
|
]
|
|
|
|
|
|
combined_selectors = historical_best + [s for s in base_selectors if s not in historical_best]
|
|
|
|
|
|
for selector in combined_selectors:
|
|
try:
|
|
|
|
self.selector_stats.update_selector_attempt(selector)
|
|
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
|
|
if elements:
|
|
logger.info(f"Found {len(elements)} Facebook ads using selector: {selector}")
|
|
|
|
|
|
self.selector_stats.update_selector_success(selector, len(elements))
|
|
|
|
return elements
|
|
except Exception as e:
|
|
logger.debug(f"Facebook selector {selector} failed: {e}")
|
|
|
|
|
|
try:
|
|
|
|
patterns = [
|
|
"//div[contains(., 'Library ID:')]",
|
|
"//div[contains(., 'Sponsored')]",
|
|
"//div[contains(., 'Active')][contains(., 'Library ID')]",
|
|
"//div[contains(., 'Inactive')][contains(., 'Library ID')]"
|
|
]
|
|
|
|
for pattern in patterns:
|
|
elements = self.driver.find_elements(By.XPATH, pattern)
|
|
if elements:
|
|
ad_containers = []
|
|
for element in elements:
|
|
try:
|
|
|
|
container = element
|
|
for _ in range(5):
|
|
if container.get_attribute("class") and "card" in container.get_attribute(
|
|
"class").lower():
|
|
ad_containers.append(container)
|
|
break
|
|
container = container.find_element(By.XPATH, "..")
|
|
except:
|
|
continue
|
|
|
|
if ad_containers:
|
|
logger.info(f"Found {len(ad_containers)} Facebook ads using text pattern approach")
|
|
|
|
|
|
self.selector_stats.update_selector_success("text_pattern_method", len(ad_containers))
|
|
|
|
return ad_containers
|
|
except Exception as e:
|
|
logger.debug(f"Facebook text pattern approach failed: {e}")
|
|
|
|
return []
|
|
|
|
def _extract_facebook_ad_data(self, ad_element, full_text):
|
|
"""Extract data from Facebook ad using multiple intelligent methods"""
|
|
extracted_data = {}
|
|
|
|
|
|
if full_text:
|
|
|
|
lines = full_text.split('\n')
|
|
|
|
|
|
if lines and lines[0] in ["Active", "Inactive"]:
|
|
extracted_data["status"] = lines[0]
|
|
|
|
|
|
for i, line in enumerate(lines):
|
|
if "See ad details" in line or "See summary details" in line:
|
|
if i + 1 < len(lines):
|
|
extracted_data["advertiser"] = lines[i + 1].strip()
|
|
break
|
|
else:
|
|
|
|
if lines:
|
|
extracted_data["advertiser"] = lines[0].strip()
|
|
|
|
|
|
|
|
content_start_idx = -1
|
|
content_end_idx = len(lines)
|
|
|
|
|
|
for i, line in enumerate(lines):
|
|
if "Sponsored" in line:
|
|
content_start_idx = i + 1
|
|
break
|
|
|
|
|
|
if content_start_idx == -1:
|
|
|
|
metadata_patterns = [
|
|
"Library ID:",
|
|
"Started running on",
|
|
"Platforms",
|
|
"Open Drop-down",
|
|
"See ad details",
|
|
"See summary details",
|
|
"This ad has multiple versions"
|
|
]
|
|
|
|
for i, line in enumerate(lines):
|
|
if any(pattern in line for pattern in metadata_patterns):
|
|
continue
|
|
|
|
if i > 0:
|
|
content_start_idx = i
|
|
break
|
|
|
|
|
|
ui_elements = [
|
|
"Like", "Comment", "Share", "Learn More", "Shop Now",
|
|
"Sign Up", "Visit Instagram profile", "See More"
|
|
]
|
|
|
|
for i, line in enumerate(lines):
|
|
|
|
if i <= content_start_idx:
|
|
continue
|
|
|
|
if any(ui in line for ui in ui_elements):
|
|
content_end_idx = i
|
|
break
|
|
|
|
|
|
if content_start_idx != -1 and content_start_idx < content_end_idx:
|
|
content_lines = lines[content_start_idx:content_end_idx]
|
|
extracted_data["text"] = "\n".join(content_lines).strip()
|
|
|
|
|
|
if "text" not in extracted_data or not extracted_data["text"]:
|
|
facebook_text_selectors = [
|
|
"div[data-ad-preview='message']",
|
|
"div[class*='_7jy6']",
|
|
"div[data-testid='ad-creative-text']",
|
|
"div[class*='_38ki']",
|
|
"span[class*='_7oe']",
|
|
"div.text_exposed_root"
|
|
]
|
|
|
|
for selector in facebook_text_selectors:
|
|
try:
|
|
elements = ad_element.find_elements(By.CSS_SELECTOR, selector)
|
|
text_content = " ".join([e.text.strip() for e in elements if e.text.strip()])
|
|
if text_content:
|
|
extracted_data["text"] = text_content
|
|
break
|
|
except:
|
|
pass
|
|
|
|
|
|
if "advertiser" not in extracted_data or not extracted_data["advertiser"]:
|
|
facebook_advertiser_selectors = [
|
|
"span[class*='fsl']",
|
|
"a[aria-label*='profile']",
|
|
"h4",
|
|
"div[class*='_8jh5']",
|
|
"a[role='link']",
|
|
"div[class*='_3qn7']",
|
|
"div[class*='_7jvw'] a",
|
|
]
|
|
|
|
for selector in facebook_advertiser_selectors:
|
|
try:
|
|
elements = ad_element.find_elements(By.CSS_SELECTOR, selector)
|
|
for element in elements:
|
|
text = element.text.strip()
|
|
if text and len(text) < 50:
|
|
extracted_data["advertiser"] = text
|
|
break
|
|
if "advertiser" in extracted_data and extracted_data["advertiser"]:
|
|
break
|
|
except:
|
|
pass
|
|
|
|
return extracted_data
|
|
|
|
def _generate_placeholder_facebook_data(self, query):
|
|
"""Generate placeholder Facebook ad data when real ads cannot be scraped"""
|
|
logger.info(f"Returning placeholder Facebook ad data for query: {query}")
|
|
return [
|
|
{
|
|
"platform": "Facebook",
|
|
"query": query,
|
|
"advertiser": "Placeholder Advertiser 1",
|
|
"text": f"This is a placeholder ad for {query} since no actual ads could be scraped.",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"index": 1,
|
|
"is_placeholder": True,
|
|
"session_id": self.session_id
|
|
},
|
|
{
|
|
"platform": "Facebook",
|
|
"query": query,
|
|
"advertiser": "Placeholder Advertiser 2",
|
|
"text": f"Another placeholder ad for {query}. Please check your scraping settings.",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"index": 2,
|
|
"is_placeholder": True,
|
|
"session_id": self.session_id
|
|
}
|
|
]
|
|
|
|
def _save_debug_data(self, prefix, query):
|
|
"""Save debugging data for investigation"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
debug_dir = "debug_data"
|
|
|
|
if not os.path.exists(debug_dir):
|
|
os.makedirs(debug_dir)
|
|
|
|
|
|
screenshot_path = f"{debug_dir}/{prefix}_{query}_{timestamp}.png"
|
|
self.driver.save_screenshot(screenshot_path)
|
|
logger.info(f"Saved debug screenshot to {screenshot_path}")
|
|
|
|
|
|
html_path = f"{debug_dir}/{prefix}_{query}_{timestamp}.html"
|
|
with open(html_path, "w", encoding="utf-8") as f:
|
|
f.write(self.driver.page_source)
|
|
logger.info(f"Saved debug HTML to {html_path}")
|
|
|
|
|
|
try:
|
|
ad_elements = self.driver.find_elements(By.CSS_SELECTOR, "div[class*='_7jvw']")
|
|
if ad_elements:
|
|
first_ad = ad_elements[0]
|
|
|
|
first_ad_html = first_ad.get_attribute('outerHTML')
|
|
|
|
sample_path = f"{debug_dir}/{prefix}_sample_ad_{timestamp}.html"
|
|
with open(sample_path, "w", encoding="utf-8") as f:
|
|
f.write(first_ad_html)
|
|
logger.info(f"Saved sample ad HTML to {sample_path}")
|
|
|
|
|
|
logger.info(f"Sample ad text structure: {first_ad.text[:300]}...")
|
|
except Exception as e:
|
|
logger.error(f"Error saving ad sample: {e}")
|
|
|
|
def close(self):
|
|
"""Close the WebDriver and save stats"""
|
|
if self.driver:
|
|
self.driver.quit()
|
|
|
|
|
|
self.selector_stats._save_stats()
|
|
|
|
|
|
|
|
def fetch_facebook_ads(query):
|
|
"""Fetch Facebook ads only for Gradio interface"""
|
|
logger.info(f"Processing Facebook ad search for: {query}")
|
|
|
|
scraper = FacebookAdsScraper(headless=True, debug_mode=True)
|
|
|
|
|
|
visibility_ok = scraper.check_headless_visibility()
|
|
if not visibility_ok:
|
|
logger.warning("Headless visibility check failed, results may be affected")
|
|
|
|
|
|
facebook_ads = scraper.fetch_facebook_ads(query)
|
|
|
|
|
|
formatted_results = []
|
|
for ad in facebook_ads:
|
|
formatted_ad = f"Platform: {ad['platform']}\n"
|
|
|
|
|
|
if 'status' in ad:
|
|
formatted_ad += f"Status: {ad['status']}\n"
|
|
|
|
formatted_ad += f"Advertiser: {ad['advertiser']}\n"
|
|
|
|
|
|
text_lines = []
|
|
if ad['text'] and ad['text'] != "Ad content not available":
|
|
|
|
words = ad['text'].split()
|
|
current_line = ""
|
|
for word in words:
|
|
if len(current_line) + len(word) + 1 <= 80:
|
|
current_line += (" " + word if current_line else word)
|
|
else:
|
|
text_lines.append(current_line)
|
|
current_line = word
|
|
if current_line:
|
|
text_lines.append(current_line)
|
|
|
|
formatted_text = "\n".join(text_lines)
|
|
else:
|
|
formatted_text = ad['text']
|
|
|
|
formatted_ad += f"Ad Text: {formatted_text}\n"
|
|
formatted_ad += f"Timestamp: {ad['timestamp']}\n"
|
|
if ad.get('is_placeholder', False):
|
|
formatted_ad += "[THIS IS PLACEHOLDER DATA]\n"
|
|
formatted_ad += "-" * 50
|
|
formatted_results.append(formatted_ad)
|
|
|
|
scraper.close()
|
|
|
|
return "\n\n".join(formatted_results) if formatted_results else "No Facebook ads found for your query."
|
|
|
|
|
|
|
|
def save_ads_to_json(ads, query):
|
|
"""Save ads to a JSON file"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"facebook_ads_{query.replace(' ', '_')}_{timestamp}.json"
|
|
|
|
try:
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
|
json.dump(ads, f, indent=2, ensure_ascii=False)
|
|
logger.info(f"Saved ads to {filename}")
|
|
return filename
|
|
except Exception as e:
|
|
logger.error(f"Error saving ads to JSON: {e}")
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MAX_ADS_DEFAULT = 5
|
|
|
|
|
|
try:
|
|
from GoogleAds.main import GoogleAds, show_regions_list
|
|
from GoogleAds.regions import Regions
|
|
|
|
USING_ACTUAL_GOOGLE_ADS = True
|
|
logger.info("Successfully imported GoogleAds module")
|
|
except ImportError as e:
|
|
|
|
logger.warning(f"GoogleAds module not found: {e}. Using mock implementation.")
|
|
USING_ACTUAL_GOOGLE_ADS = False
|
|
|
|
|
|
Regions = {
|
|
"GB": {"Region": "United Kingdom"}
|
|
}
|
|
|
|
|
|
def show_regions_list():
|
|
"""Mock function - only used if real module fails to import"""
|
|
return [("GB", "United Kingdom"), ("US", "United States")]
|
|
|
|
|
|
|
|
class GoogleAds:
|
|
def __init__(self, region="GB"):
|
|
self.region = region
|
|
logger.warning(f"Using MOCK GoogleAds implementation with region: {region}")
|
|
logger.warning("Please install the GoogleAds module for actual data")
|
|
|
|
def creative_search_by_advertiser_id(self, advertiser_id, count=5):
|
|
|
|
logger.warning(f"MOCK: Searching for creatives from advertiser {advertiser_id}")
|
|
return [f"creative_{i}_{advertiser_id}" for i in range(min(count, 3))]
|
|
|
|
def get_detailed_ad(self, advertiser_id, creative_id):
|
|
|
|
logger.warning(f"MOCK: Getting details for creative {creative_id}")
|
|
|
|
|
|
advertiser_name = "Unknown"
|
|
for adv in ADVERTISERS:
|
|
if adv["id"] == advertiser_id:
|
|
advertiser_name = adv["name"]
|
|
break
|
|
|
|
|
|
return {
|
|
"Ad Format": "Text",
|
|
"Advertiser": advertiser_name,
|
|
"Advertiser Name": advertiser_name,
|
|
"Ad Title": f"MOCK DATA - INSTALL GOOGLE ADS MODULE",
|
|
"Ad Body": f"This is MOCK data because the GoogleAds module is not installed. Please install the proper module.",
|
|
"Last Shown": datetime.now().strftime("%Y-%m-%d"),
|
|
"Creative Id": creative_id,
|
|
"Ad Link": "#"
|
|
}
|
|
|
|
|
|
def clean_ad_text(text):
|
|
"""Clean ad text by removing special characters and formatting issues."""
|
|
if text is None or not isinstance(text, str):
|
|
return ""
|
|
|
|
|
|
cleaned = text.replace('â¦', '')
|
|
cleaned = cleaned.replace('â©', '')
|
|
cleaned = cleaned.replace('<dynamically generated based on landing page content>', '[Dynamic Content]')
|
|
|
|
|
|
cleaned = re.sub(r'[^\x00-\x7F]+', '', cleaned)
|
|
|
|
return cleaned.strip()
|
|
|
|
|
|
def get_regions_list():
|
|
"""Get a limited list of regions - only GB and anywhere."""
|
|
regions = [
|
|
("anywhere", "Global (anywhere)"),
|
|
("GB", f"{Regions['GB']['Region']} (GB)")
|
|
]
|
|
return regions
|
|
|
|
|
|
def search_by_advertiser_id(advertiser_id: str, max_ads=MAX_ADS_DEFAULT, region="GB", progress=gr.Progress(),
|
|
provided_name=None) -> Tuple[
|
|
str, Optional[pd.DataFrame], Optional[Dict]]:
|
|
|
|
try:
|
|
progress(0, desc="Initializing scraper...")
|
|
|
|
|
|
region_val = region
|
|
if isinstance(region, tuple) and len(region) > 0:
|
|
region_val = region[0]
|
|
|
|
|
|
if region_val == "Global (anywhere)" or "anywhere" in str(region_val).lower():
|
|
region_val = "anywhere"
|
|
|
|
|
|
scraper = GoogleAds(region=region_val)
|
|
|
|
progress(0.2, desc=f"Fetching ads for advertiser ID: {advertiser_id}")
|
|
|
|
|
|
creative_ids = scraper.creative_search_by_advertiser_id(advertiser_id, count=max_ads)
|
|
|
|
if not creative_ids:
|
|
return f"No ads found for advertiser ID: {advertiser_id}", None, None
|
|
|
|
progress(0.3, desc=f"Found {len(creative_ids)} ads. Fetching details...")
|
|
|
|
|
|
ads_data = []
|
|
ad_formats = {}
|
|
|
|
for i, creative_id in enumerate(creative_ids):
|
|
progress_val = 0.3 + (0.7 * (i / len(creative_ids)))
|
|
progress(progress_val, desc=f"Processing ad {i + 1}/{len(creative_ids)}")
|
|
|
|
try:
|
|
ad_details = scraper.get_detailed_ad(advertiser_id, creative_id)
|
|
|
|
|
|
if 'Ad Title' in ad_details:
|
|
ad_details['Ad Title'] = clean_ad_text(ad_details['Ad Title'])
|
|
|
|
if 'Ad Body' in ad_details:
|
|
ad_details['Ad Body'] = clean_ad_text(ad_details['Ad Body'])
|
|
|
|
ads_data.append(ad_details)
|
|
|
|
|
|
ad_format = ad_details.get("Ad Format", "Unknown")
|
|
ad_formats[ad_format] = ad_formats.get(ad_format, 0) + 1
|
|
|
|
|
|
time.sleep(0.2)
|
|
except Exception as e:
|
|
print(f"Error fetching details for ad {creative_id}: {e}")
|
|
|
|
if not ads_data:
|
|
return f"Retrieved creative IDs but couldn't fetch ad details for advertiser ID: {advertiser_id}", None, None
|
|
|
|
|
|
df = pd.DataFrame(ads_data)
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
|
|
|
|
advertiser_name = "Unknown"
|
|
|
|
|
|
if provided_name:
|
|
advertiser_name = provided_name
|
|
else:
|
|
|
|
for adv in ADVERTISERS:
|
|
if adv["id"] == advertiser_id:
|
|
advertiser_name = adv["name"]
|
|
break
|
|
|
|
|
|
if advertiser_name == "Unknown" and ads_data and len(ads_data) > 0:
|
|
|
|
for field in ["Advertiser", "Advertiser Name", "advertiser_name"]:
|
|
if field in ads_data[0]:
|
|
advertiser_name = ads_data[0][field]
|
|
break
|
|
|
|
summary = {
|
|
'advertiser_id': advertiser_id,
|
|
'advertiser_name': advertiser_name,
|
|
'ads_count': len(ads_data),
|
|
'timestamp': timestamp,
|
|
'region': region_val,
|
|
'ad_formats': ad_formats
|
|
}
|
|
|
|
|
|
dates = []
|
|
for ad in ads_data:
|
|
|
|
for field in ["Last Shown", "last_shown_date"]:
|
|
if field in ad and ad[field]:
|
|
dates.append(ad[field])
|
|
break
|
|
|
|
if dates:
|
|
summary['earliest_ad'] = min(dates)
|
|
summary['latest_ad'] = max(dates)
|
|
|
|
|
|
summary = {
|
|
'advertiser_id': advertiser_id,
|
|
'advertiser_name': advertiser_name,
|
|
'ads_count': len(ads_data),
|
|
'timestamp': timestamp,
|
|
'region': region_val,
|
|
'ad_formats': ad_formats
|
|
}
|
|
|
|
|
|
dates = []
|
|
for ad in ads_data:
|
|
|
|
for field in ["Last Shown", "last_shown_date"]:
|
|
if field in ad and ad[field]:
|
|
dates.append(ad[field])
|
|
break
|
|
|
|
if dates:
|
|
summary['earliest_ad'] = min(dates)
|
|
summary['latest_ad'] = max(dates)
|
|
|
|
success_message = (
|
|
f"Found {len(ads_data)} ads for advertiser '{advertiser_name}' (ID: {advertiser_id})."
|
|
)
|
|
|
|
progress(1.0, desc="Complete!")
|
|
return success_message, df, summary
|
|
|
|
except Exception as e:
|
|
error_message = f"Error searching for advertiser ID: {str(e)}"
|
|
return error_message, None, None
|
|
|
|
|
|
def process_advertiser_search(advertiser_selection, region, max_ads, progress=gr.Progress()):
|
|
"""Handle the advertiser selection form submission and update the UI."""
|
|
|
|
|
|
if not advertiser_selection:
|
|
return "Please select an advertiser to search", None, None, None
|
|
|
|
|
|
parts = advertiser_selection.split(":", 1)
|
|
advertiser_id = parts[0].strip()
|
|
advertiser_name = parts[1].strip() if len(parts) > 1 else "Unknown"
|
|
|
|
|
|
result_message, ads_df, summary_info = search_by_advertiser_id(
|
|
advertiser_id, max_ads, region, progress, advertiser_name
|
|
)
|
|
|
|
|
|
analysis_html = analyze_ads(ads_df, summary_info) if ads_df is not None and not ads_df.empty else None
|
|
|
|
return result_message, ads_df, analysis_html, summary_info
|
|
|
|
|
|
def analyze_ads(df: pd.DataFrame, summary: Dict) -> str:
|
|
"""
|
|
Analyze ads data and generate insights.
|
|
|
|
Args:
|
|
df: DataFrame containing ad data
|
|
summary: Dictionary with summary information
|
|
|
|
Returns:
|
|
HTML string with analysis results
|
|
"""
|
|
if df is None or df.empty or summary is None:
|
|
return "<h3>No data available for analysis</h3>"
|
|
|
|
try:
|
|
|
|
html = f"""
|
|
<div style="font-family: Arial, sans-serif;">
|
|
<h2>{summary.get('advertiser_name', 'Unknown Advertiser')} - Ad Analysis</h2>
|
|
|
|
<div style="background-color: #f5f5f5; padding: 15px; border-radius: 5px; margin-bottom: 20px;">
|
|
<h3>Overview</h3>
|
|
<p><b>Advertiser ID:</b> {summary.get('advertiser_id', 'Unknown')}</p>
|
|
<p><b>Total Ads Found:</b> {summary['ads_count']}</p>
|
|
<p><b>Region:</b> {summary['region']}</p>
|
|
<p><b>Data Collected:</b> {summary['timestamp'].replace('_', ' ').replace('-', '/')}</p>
|
|
|
|
{f"<p><b>Ad Date Range:</b> {summary.get('earliest_ad')} to {summary.get('latest_ad')}</p>" if 'earliest_ad' in summary else ""}
|
|
</div>
|
|
|
|
<div style="display: flex; margin-bottom: 20px;">
|
|
<div style="flex: 1; background-color: #f5f5f5; padding: 15px; border-radius: 5px; margin-right: 10px;">
|
|
<h3>Ad Format Distribution</h3>
|
|
<table style="width: 100%; border-collapse: collapse;">
|
|
<tr style="background-color: #eaeaea;">
|
|
<th style="text-align: left; padding: 8px; border-bottom: 1px solid #ddd;">Format</th>
|
|
<th style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">Count</th>
|
|
<th style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">Percentage</th>
|
|
</tr>
|
|
"""
|
|
|
|
total = sum(summary['ad_formats'].values())
|
|
for format_name, count in summary['ad_formats'].items():
|
|
percentage = (count / total) * 100
|
|
html += f"""
|
|
<tr>
|
|
<td style="padding: 8px; border-bottom: 1px solid #ddd;">{format_name}</td>
|
|
<td style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">{count}</td>
|
|
<td style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">{percentage:.1f}%</td>
|
|
</tr>
|
|
"""
|
|
|
|
html += """
|
|
</table>
|
|
</div>
|
|
"""
|
|
|
|
|
|
if 'Ad Title' in df.columns and not df['Ad Title'].isna().all():
|
|
from collections import Counter
|
|
import re
|
|
|
|
|
|
all_titles = ' '.join(df['Ad Title'].dropna().astype(str).tolist())
|
|
words = re.findall(r'\b\w+\b', all_titles.lower())
|
|
|
|
|
|
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with', 'by', 'of', 'is',
|
|
'are'}
|
|
filtered_words = [word for word in words if word not in stop_words and len(word) > 2]
|
|
|
|
|
|
word_counts = Counter(filtered_words).most_common(10)
|
|
|
|
if word_counts:
|
|
html += """
|
|
<div style="flex: 1; background-color: #f5f5f5; padding: 15px; border-radius: 5px;">
|
|
<h3>Most Common Words in Ad Titles</h3>
|
|
<table style="width: 100%; border-collapse: collapse;">
|
|
<tr style="background-color: #eaeaea;">
|
|
<th style="text-align: left; padding: 8px; border-bottom: 1px solid #ddd;">Word</th>
|
|
<th style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">Frequency</th>
|
|
</tr>
|
|
"""
|
|
|
|
for word, count in word_counts:
|
|
html += f"""
|
|
<tr>
|
|
<td style="padding: 8px; border-bottom: 1px solid #ddd;">{word}</td>
|
|
<td style="text-align: center; padding: 8px; border-bottom: 1px solid #ddd;">{count}</td>
|
|
</tr>
|
|
"""
|
|
|
|
html += """
|
|
</table>
|
|
</div>
|
|
"""
|
|
|
|
html += """
|
|
</div>
|
|
|
|
<h3>SEO & Marketing Insights</h3>
|
|
<div style="background-color: #f5f5f5; padding: 15px; border-radius: 5px; margin-bottom: 20px;">
|
|
"""
|
|
|
|
|
|
html += f"""
|
|
<h4>Competitive Intelligence</h4>
|
|
<ul>
|
|
<li>The advertiser has been active in advertising until {summary.get('latest_ad', 'recently')}</li>
|
|
<li>Their ad strategy focuses primarily on {max(summary['ad_formats'].items(), key=lambda x: x[1])[0]} ads</li>
|
|
<li>Consider monitoring changes in their ad frequency and creative strategy over time</li>
|
|
</ul>
|
|
|
|
<h4>UK Market Insights</h4>
|
|
<ul>
|
|
<li>The ads were collected for the {summary['region']} market</li>
|
|
<li>Regular monitoring can reveal seasonal UK advertising patterns</li>
|
|
<li>Compare with other regions to identify UK-specific marketing approaches</li>
|
|
</ul>
|
|
"""
|
|
|
|
html += """
|
|
</div>
|
|
|
|
<h3>All Ad Examples</h3>
|
|
"""
|
|
|
|
|
|
if not df.empty:
|
|
|
|
if 'Last Shown' in df.columns:
|
|
df = df.sort_values(by='Last Shown', ascending=False)
|
|
|
|
|
|
for i, (_, ad) in enumerate(df.iterrows()):
|
|
html += f"""
|
|
<div style="background-color: #f5f5f5; padding: 15px; border-radius: 5px; margin-bottom: 15px;">
|
|
<h4>Ad {i + 1}: {ad.get('Creative Id', '')}</h4>
|
|
<p><b>Format:</b> {ad.get('Ad Format', 'Unknown')}</p>
|
|
<p><b>Last Shown:</b> {ad.get('Last Shown', 'Unknown')}</p>
|
|
"""
|
|
|
|
|
|
if 'Ad Title' in ad and pd.notna(ad['Ad Title']) and ad['Ad Title']:
|
|
html += f"<p><b>Title:</b> {ad['Ad Title']}</p>"
|
|
|
|
if 'Ad Body' in ad and pd.notna(ad['Ad Body']) and ad['Ad Body']:
|
|
body = ad['Ad Body']
|
|
if len(body) > 150:
|
|
body = body[:150] + "..."
|
|
html += f"<p><b>Body:</b> {body}</p>"
|
|
|
|
|
|
if 'Image URL' in ad and pd.notna(ad['Image URL']) and ad['Image URL']:
|
|
html += f"""<p><img src="{ad['Image URL']}" style="max-width: 300px; max-height: 200px;" /></p>"""
|
|
|
|
if 'Ad Link' in ad and pd.notna(ad['Ad Link']) and ad['Ad Link'] and ad.get('Ad Format') != 'Text':
|
|
html += f"""<p><b>Ad Link:</b> <a href="{ad['Ad Link']}" target="_blank">View Ad</a></p>"""
|
|
|
|
html += "</div>"
|
|
|
|
html += """
|
|
</div>
|
|
"""
|
|
|
|
return html
|
|
|
|
except Exception as e:
|
|
return f"<h3>Error analyzing data: {str(e)}</h3>"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_combined_app():
|
|
"""Create the combined Gradio interface with Facebook and Google Ads scrapers"""
|
|
|
|
|
|
advertiser_choices = [f"{adv['id']}: {adv['name']}" for adv in ADVERTISERS]
|
|
|
|
with gr.Blocks(title="Combined Ads Transparency Scraper") as app:
|
|
gr.Markdown("# Combined Ads Transparency Scraper")
|
|
gr.Markdown("## Search for ads from Facebook and Google Ads transparency tools")
|
|
|
|
|
|
with gr.Tabs() as tabs:
|
|
|
|
with gr.TabItem("Facebook Ad Library"):
|
|
gr.Markdown("### Facebook Ad Library Search")
|
|
gr.Markdown("Search for ads by brand, domain, or keyword")
|
|
|
|
with gr.Row():
|
|
fb_query_input = gr.Textbox(
|
|
label="Search Query",
|
|
placeholder="Enter brand, domain or product name",
|
|
value=""
|
|
)
|
|
fb_search_button = gr.Button("Find Facebook Ads", variant="primary")
|
|
|
|
fb_results_output = gr.Textbox(label="Search Results", lines=20)
|
|
fb_save_button = gr.Button("Save Results to JSON")
|
|
fb_save_status = gr.Textbox(label="Save Status", lines=1)
|
|
|
|
|
|
def save_fb_results(query, results_text):
|
|
if not results_text or "No Facebook ads found" in results_text:
|
|
return "No ads to save"
|
|
|
|
|
|
scraper = FacebookAdsScraper(headless=True, debug_mode=False)
|
|
ads = scraper.fetch_facebook_ads(query)
|
|
scraper.close()
|
|
|
|
|
|
filename = save_ads_to_json(ads, query)
|
|
if filename:
|
|
return f"Saved {len(ads)} ads to {filename}"
|
|
else:
|
|
return "Error saving ads to JSON"
|
|
|
|
|
|
fb_search_button.click(
|
|
fn=fetch_facebook_ads,
|
|
inputs=[fb_query_input],
|
|
outputs=[fb_results_output]
|
|
)
|
|
|
|
fb_save_button.click(
|
|
fn=save_fb_results,
|
|
inputs=[fb_query_input, fb_results_output],
|
|
outputs=[fb_save_status]
|
|
)
|
|
|
|
|
|
with gr.TabItem("Google Ads (Lightsaber Companies)"):
|
|
gr.Markdown("### Lightsaber Companies Ads Transparency Scraper")
|
|
gr.Markdown("View Google Ads data for popular lightsaber companies")
|
|
|
|
with gr.Row():
|
|
with gr.Column(scale=3):
|
|
advertiser_dropdown = gr.Dropdown(
|
|
choices=advertiser_choices,
|
|
label="Select Lightsaber Company",
|
|
info="Choose a company to view their Google Ads data"
|
|
)
|
|
|
|
with gr.Row():
|
|
region_dropdown = gr.Dropdown(
|
|
choices=get_regions_list(),
|
|
value="GB",
|
|
label="Region",
|
|
info="Choose between Global or UK"
|
|
)
|
|
|
|
max_ads_slider = gr.Slider(
|
|
minimum=1,
|
|
maximum=10,
|
|
value=5,
|
|
step=1,
|
|
label="Max Ads to Retrieve"
|
|
)
|
|
|
|
search_button = gr.Button("Search Ads", variant="primary")
|
|
|
|
with gr.Column(scale=2):
|
|
result_message = gr.Markdown(label="Search Result")
|
|
|
|
|
|
with gr.Tabs() as google_result_tabs:
|
|
with gr.Tab("Analysis"):
|
|
analysis_html = gr.HTML()
|
|
|
|
with gr.Tab("Raw Data"):
|
|
ads_table = gr.DataFrame()
|
|
|
|
|
|
summary_info = gr.State()
|
|
|
|
|
|
search_button.click(
|
|
fn=process_advertiser_search,
|
|
inputs=[advertiser_dropdown, region_dropdown, max_ads_slider],
|
|
outputs=[result_message, ads_table, analysis_html, summary_info]
|
|
)
|
|
|
|
|
|
with gr.Accordion("About This Tool", open=False):
|
|
gr.Markdown("""
|
|
## About Combined Ads Transparency Scraper
|
|
|
|
This tool combines two different ad transparency scrapers:
|
|
|
|
1. **Facebook Ad Library Scraper**: Search for any advertiser's ads on Facebook.
|
|
2. **Google Ads Transparency Scraper**: View ads for popular lightsaber companies.
|
|
|
|
### Technical Details
|
|
|
|
- The Facebook scraper uses Selenium WebDriver with anti-detection techniques.
|
|
- The Google Ads scraper leverages the Google Ad Transparency API.
|
|
- Both scrapers include adaptive error handling and fallback mechanisms.
|
|
|
|
### Usage Notes
|
|
|
|
- Facebook scraping may take 30-60 seconds to complete
|
|
- Search results are not stored permanently
|
|
- Use the "Save Results" button to save data for later analysis
|
|
|
|
**Note**: This tool is intended for research and educational purposes only.
|
|
""")
|
|
|
|
return app
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Combined Ads Transparency Scraper")
|
|
parser.add_argument("--headless", action="store_true", default=True, help="Run in headless mode")
|
|
parser.add_argument("--debug", action="store_true", help="Enable debug mode with extra logging")
|
|
parser.add_argument("--fb-query", type=str, help="Facebook search query to run directly without Gradio")
|
|
parser.add_argument("--google-advertiser", type=str, help="Google Ads advertiser ID to run directly without Gradio")
|
|
parser.add_argument("--save", action="store_true", help="Save results to JSON file when using direct query")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.fb_query:
|
|
|
|
scraper = FacebookAdsScraper(headless=args.headless, debug_mode=args.debug)
|
|
scraper.check_headless_visibility()
|
|
|
|
facebook_ads = scraper.fetch_facebook_ads(args.fb_query)
|
|
|
|
|
|
print(f"\nFound {len(facebook_ads)} Facebook ads for '{args.fb_query}'")
|
|
|
|
if facebook_ads:
|
|
for i, ad in enumerate(facebook_ads):
|
|
print(f"\n--- Ad {i + 1} ---")
|
|
print(f"Platform: {ad['platform']}")
|
|
if 'status' in ad:
|
|
print(f"Status: {ad['status']}")
|
|
print(f"Advertiser: {ad['advertiser']}")
|
|
print(f"Text: {ad['text']}")
|
|
if ad.get('is_placeholder', False):
|
|
print("[THIS IS PLACEHOLDER DATA]")
|
|
|
|
|
|
if args.save:
|
|
filename = save_ads_to_json(facebook_ads, args.fb_query)
|
|
if filename:
|
|
print(f"\nSaved {len(facebook_ads)} ads to {filename}")
|
|
else:
|
|
print("No Facebook ads found.")
|
|
|
|
scraper.close()
|
|
|
|
elif args.google_advertiser:
|
|
|
|
advertiser_id = args.google_advertiser
|
|
|
|
|
|
advertiser_name = "Unknown"
|
|
for adv in ADVERTISERS:
|
|
if adv["id"] == advertiser_id:
|
|
advertiser_name = adv["name"]
|
|
break
|
|
|
|
print(f"\nSearching for Google Ads from advertiser '{advertiser_name}' (ID: {advertiser_id})")
|
|
|
|
|
|
|
|
class DummyProgress:
|
|
def __call__(self, value, desc=None):
|
|
if desc:
|
|
print(f"{desc} ({value * 100:.0f}%)")
|
|
|
|
|
|
result_message, ads_df, summary_info = search_by_advertiser_id(
|
|
advertiser_id,
|
|
max_ads=5,
|
|
region="GB",
|
|
progress=DummyProgress(),
|
|
provided_name=advertiser_name
|
|
)
|
|
|
|
print(f"\n{result_message}")
|
|
|
|
if ads_df is not None and not ads_df.empty:
|
|
print("\nFound ads:")
|
|
for i, (_, ad) in enumerate(ads_df.iterrows()):
|
|
print(f"\n--- Ad {i + 1} ---")
|
|
print(f"Format: {ad.get('Ad Format', 'Unknown')}")
|
|
print(f"Title: {ad.get('Ad Title', 'Unknown')}")
|
|
body_text = ad.get('Ad Body', 'Unknown')
|
|
if len(body_text) > 100:
|
|
body_text = body_text[:100] + "..."
|
|
print(f"Body: {body_text}")
|
|
print(f"Last Shown: {ad.get('Last Shown', 'Unknown')}")
|
|
print(f"Creative ID: {ad.get('Creative Id', 'Unknown')}")
|
|
else:
|
|
print("No Google ads found or error occurred.")
|
|
|
|
else:
|
|
|
|
app = create_combined_app()
|
|
print("Starting Combined Ads Transparency Scraper")
|
|
print("Facebook: Search for any brand or company")
|
|
print("Google Ads: Available lightsaber companies:")
|
|
for adv in ADVERTISERS:
|
|
print(f" - {adv['name']}")
|
|
app.launch()
|
|
|