import logging import time from typing import Dict, Any, List, Optional, Tuple, Type from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse from scrapers.base_scraper import BaseScraper from scrapers.news_scraper import NewsScraper from scrapers.blog_scraper import BlogScraper from utils.rate_limiter import RateLimiter logger = logging.getLogger(__name__) class ScraperService: """Service to manage scraping operations""" def __init__(self, max_workers: int = 5, timeout: int = 30): """ Initialize scraper service Args: max_workers: Maximum number of concurrent scrapers timeout: Timeout for each scraping operation in seconds """ self.max_workers = max_workers self.timeout = timeout self.rate_limiters = {} # Domain-specific rate limiters # Register available scrapers self.scrapers = { "news": NewsScraper(timeout), "blog": BlogScraper(timeout), } def get_scraper_for_url(self, url: str) -> Tuple[BaseScraper, str]: """ Determine the appropriate scraper to use for a URL Args: url: URL to scrape Returns: Tuple of (scraper instance, scraper type) """ # Simple logic to determine scraper type based on URL patterns # This could be enhanced with more sophisticated detection domain = urlparse(url).netloc.lower() # News site patterns news_patterns = ["news", "cnn", "bbc", "reuters", "nytimes", "washingtonpost", "guardian", "aljazeera", "foxnews", "nbcnews", "abc"] # Blog patterns blog_patterns = ["blog", "medium", "wordpress", "blogspot", "tumblr", "substack", "ghost", "hashnode"] # Check domain against patterns for pattern in news_patterns: if pattern in domain: return self.scrapers["news"], "news" for pattern in blog_patterns: if pattern in domain: return self.scrapers["blog"], "blog" # Default to news scraper return self.scrapers["news"], "news" def _get_rate_limiter(self, domain: str) -> RateLimiter: """Get or create a rate limiter for a specific domain""" if domain not in self.rate_limiters: # Default: 5 requests per minute for each domain self.rate_limiters[domain] = RateLimiter(window_size=60, max_requests=5) return self.rate_limiters[domain] def scrape_url(self, url: str, scraper_type: Optional[str] = None) -> Dict[str, Any]: """ Scrape a single URL Args: url: URL to scrape scraper_type: Optional type of scraper to use Returns: Dictionary with scraped data """ try: # Parse domain for rate limiting domain = urlparse(url).netloc rate_limiter = self._get_rate_limiter(domain) # Check if we can proceed with the request if not rate_limiter.can_proceed(): wait_time = rate_limiter.get_wait_time() logger.warning(f"Rate limit reached for {domain}. Waiting {wait_time:.2f} seconds") time.sleep(wait_time) # Select appropriate scraper if scraper_type and scraper_type in self.scrapers: scraper = self.scrapers[scraper_type] selected_type = scraper_type else: scraper, selected_type = self.get_scraper_for_url(url) logger.info(f"Scraping {url} with {selected_type} scraper") # Perform scraping and record the request result = scraper.scrape(url) rate_limiter.record_request() # Add metadata about scraping result["scraper_type"] = selected_type result["scraped_at"] = time.time() return result except Exception as e: logger.error(f"Error scraping URL {url}: {str(e)}") return { "success": False, "url": url, "error": str(e), "scraped_at": time.time() } def scrape_multiple_urls(self, urls: List[str]) -> List[Dict[str, Any]]: """ Scrape multiple URLs in parallel Args: urls: List of URLs to scrape Returns: List of dictionaries with scraped data """ results = [] # Use ThreadPoolExecutor for concurrent scraping with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit scraping tasks future_to_url = {executor.submit(self.scrape_url, url): url for url in urls} # Collect results as they complete for future in as_completed(future_to_url): url = future_to_url[future] try: result = future.result() results.append(result) logger.info(f"Completed scraping: {url}") except Exception as e: logger.error(f"Exception scraping {url}: {str(e)}") results.append({ "success": False, "url": url, "error": str(e), "scraped_at": time.time() }) return results