import os import re import time import asyncio from concurrent.futures import ThreadPoolExecutor from typing import List, Optional, Dict, Any from urllib.parse import urlparse from fastapi import FastAPI, HTTPException, Query, Request, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException import uvicorn app = FastAPI( title="Threads Media Extractor API", description="Extract media URLs from Threads posts - Optimized version", version="2.1.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global driver pool for reuse driver_pool = [] executor = ThreadPoolExecutor(max_workers=2) class MediaItem(BaseModel): url: str class ThreadsResponse(BaseModel): post_url: str url: Optional[str] = None picker: Optional[List[MediaItem]] = None media_count: int post_text: Optional[str] = None author: Optional[str] = None success: bool processing_time: Optional[float] = None class Config: # Exclude fields that are None from the response exclude_none = True class ErrorResponse(BaseModel): error: str success: bool = False def create_optimized_driver(): """Create and configure optimized Chrome WebDriver""" options = Options() options.add_argument('--headless=new') # Use new headless mode options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-gpu') options.add_argument('--disable-extensions') options.add_argument('--disable-plugins') options.add_argument('--disable-default-apps') options.add_argument('--disable-background-timer-throttling') options.add_argument('--disable-backgrounding-occluded-windows') options.add_argument('--disable-renderer-backgrounding') options.add_argument('--disable-features=TranslateUI') options.add_argument('--disable-ipc-flooding-protection') # Performance optimizations options.add_argument('--memory-pressure-off') options.add_argument('--max_old_space_size=4096') options.add_argument('--window-size=1280,720') # Smaller window # Network optimizations options.add_argument('--aggressive-cache-discard') options.add_argument('--disable-background-networking') # Disable unnecessary features options.add_experimental_option('useAutomationExtension', False) options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_argument('--disable-blink-features=AutomationControlled') # User agent options.add_argument('--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') try: driver = webdriver.Chrome(options=options) driver.implicitly_wait(5) # Reduced wait time driver.set_page_load_timeout(15) # Reduced timeout # Optimize browser settings driver.execute_cdp_cmd('Network.setUserAgentOverride', { "userAgent": 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' }) return driver except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to create browser driver: {str(e)}") def get_driver(): """Get driver from pool or create new one""" if driver_pool: return driver_pool.pop() return create_optimized_driver() def return_driver(driver): """Return driver to pool for reuse""" if len(driver_pool) < 2: # Keep max 2 drivers in pool driver_pool.append(driver) else: try: driver.quit() except: pass def extract_post_id_from_url(url: str) -> Optional[str]: """Extract post ID from Threads URL""" patterns = [ r'threads\.net/@[^/]+/post/([A-Za-z0-9_-]+)', r'threads\.net/t/([A-Za-z0-9_-]+)', r'threads\.com/@[^/]+/post/([A-Za-z0-9_-]+)', r'threads\.com/t/([A-Za-z0-9_-]+)', ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def is_valid_threads_url(url: str) -> bool: """Validate if URL is a valid Threads URL""" try: parsed = urlparse(url) return ( parsed.netloc in ['threads.net', 'www.threads.net', 'threads.com', 'www.threads.com'] and (('/post/' in parsed.path) or ('/t/' in parsed.path)) ) except: return False def fast_extract_media(driver: webdriver.Chrome, url: str) -> Dict[str, Any]: """Optimized media extraction with faster loading""" media_urls = [] post_text = None author = None try: start_time = time.time() # Navigate to the URL driver.get(url) # Wait for essential elements only try: WebDriverWait(driver, 8).until( lambda d: d.execute_script("return document.readyState") == "complete" ) except TimeoutException: pass # Continue even if timeout # Quick wait for dynamic content time.sleep(1.5) # Reduced from 3 seconds # Extract videos first (most important) video_elements = driver.find_elements(By.TAG_NAME, 'video') for video in video_elements: src = video.get_attribute('src') if src and src.startswith('http'): media_urls.append(src) # Check source elements sources = video.find_elements(By.TAG_NAME, 'source') for source in sources: src = source.get_attribute('src') if src and src.startswith('http'): media_urls.append(src) # If no videos found, look for images quickly if not media_urls: img_elements = driver.find_elements(By.TAG_NAME, 'img')[:10] # Limit to first 10 images for img in img_elements: src = img.get_attribute('src') if src and src.startswith('http') and any(ext in src.lower() for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp']): if not any(exclude in src.lower() for exclude in ['profile', 'avatar', 'icon', 'logo']): media_urls.append(src) # Quick text extraction (optional, skip if taking too long) try: text_elements = driver.find_elements(By.CSS_SELECTOR, 'div[role="article"] span, article span')[:5] for element in text_elements: text = element.text.strip() if text and len(text) > 10 and not post_text: post_text = text break except: pass # Remove duplicates seen = set() unique_media_urls = [] for url in media_urls: if url not in seen: seen.add(url) unique_media_urls.append(url) processing_time = time.time() - start_time return { "media_urls": unique_media_urls, "post_text": post_text, "author": author, "processing_time": processing_time } except Exception as e: raise HTTPException(status_code=500, detail=f"Error extracting media: {str(e)}") def extract_media_sync(url: str) -> Dict[str, Any]: """Synchronous wrapper for thread execution""" driver = None try: driver = get_driver() result = fast_extract_media(driver, url) return result finally: if driver: return_driver(driver) @app.get("/") async def health_check(): """Health check endpoint""" return { "status": "healthy", "service": "extractor", "version": "2.1.0", "driver_pool_size": len(driver_pool) } @app.get("/extract") async def extract_media(url: str = Query(..., description="Threads post URL")): """ Extract media URLs from a Threads post - Optimized version Args: url: The Threads post URL to extract media from Returns: ThreadsResponse with media URLs and metadata """ # Validate URL if not url: raise HTTPException(status_code=400, detail="URL parameter is required") if not is_valid_threads_url(url): raise HTTPException(status_code=400, detail="Invalid Threads URL format") # Extract post ID post_id = extract_post_id_from_url(url) if not post_id: raise HTTPException(status_code=400, detail="Could not extract post ID from URL") try: # Run extraction in thread pool for better async handling loop = asyncio.get_event_loop() extracted_data = await loop.run_in_executor(executor, extract_media_sync, url) media_urls = extracted_data["media_urls"] media_count = len(media_urls) # Base response data response_data = { "post_url": url, "media_count": media_count, "post_text": extracted_data["post_text"], "author": extracted_data["author"], "success": True, "processing_time": extracted_data.get("processing_time") } # Conditionally add url or picker based on media count if media_count == 1: response_data["url"] = media_urls[0] # Don't include picker field at all elif media_count > 1: response_data["picker"] = [{"url": url} for url in media_urls] # Don't include url field at all # If media_count is 0, neither url nor picker will be included # Create response and return as JSON with excluded None values response = ThreadsResponse(**response_data) return JSONResponse(content=response.model_dump(exclude_none=True)) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @app.on_event("shutdown") async def shutdown_event(): """Clean up resources on shutdown""" executor.shutdown(wait=False) for driver in driver_pool: try: driver.quit() except: pass @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): """Custom HTTP exception handler""" return JSONResponse( status_code=exc.status_code, content={ "error": exc.detail, "success": False, "status_code": exc.status_code } ) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)