Spaces:
Running
Running
import logging | |
import time | |
from typing import Dict, Any, List, Optional, Tuple, Type | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from urllib.parse import urlparse | |
from scrapers.base_scraper import BaseScraper | |
from scrapers.news_scraper import NewsScraper | |
from scrapers.blog_scraper import BlogScraper | |
from utils.rate_limiter import RateLimiter | |
logger = logging.getLogger(__name__) | |
class ScraperService: | |
"""Service to manage scraping operations""" | |
def __init__(self, max_workers: int = 5, timeout: int = 30): | |
""" | |
Initialize scraper service | |
Args: | |
max_workers: Maximum number of concurrent scrapers | |
timeout: Timeout for each scraping operation in seconds | |
""" | |
self.max_workers = max_workers | |
self.timeout = timeout | |
self.rate_limiters = {} # Domain-specific rate limiters | |
# Register available scrapers | |
self.scrapers = { | |
"news": NewsScraper(timeout), | |
"blog": BlogScraper(timeout), | |
} | |
def get_scraper_for_url(self, url: str) -> Tuple[BaseScraper, str]: | |
""" | |
Determine the appropriate scraper to use for a URL | |
Args: | |
url: URL to scrape | |
Returns: | |
Tuple of (scraper instance, scraper type) | |
""" | |
# Simple logic to determine scraper type based on URL patterns | |
# This could be enhanced with more sophisticated detection | |
domain = urlparse(url).netloc.lower() | |
# News site patterns | |
news_patterns = ["news", "cnn", "bbc", "reuters", "nytimes", "washingtonpost", | |
"guardian", "aljazeera", "foxnews", "nbcnews", "abc"] | |
# Blog patterns | |
blog_patterns = ["blog", "medium", "wordpress", "blogspot", "tumblr", | |
"substack", "ghost", "hashnode"] | |
# Check domain against patterns | |
for pattern in news_patterns: | |
if pattern in domain: | |
return self.scrapers["news"], "news" | |
for pattern in blog_patterns: | |
if pattern in domain: | |
return self.scrapers["blog"], "blog" | |
# Default to news scraper | |
return self.scrapers["news"], "news" | |
def _get_rate_limiter(self, domain: str) -> RateLimiter: | |
"""Get or create a rate limiter for a specific domain""" | |
if domain not in self.rate_limiters: | |
# Default: 5 requests per minute for each domain | |
self.rate_limiters[domain] = RateLimiter(window_size=60, max_requests=5) | |
return self.rate_limiters[domain] | |
def scrape_url(self, url: str, scraper_type: Optional[str] = None) -> Dict[str, Any]: | |
""" | |
Scrape a single URL | |
Args: | |
url: URL to scrape | |
scraper_type: Optional type of scraper to use | |
Returns: | |
Dictionary with scraped data | |
""" | |
try: | |
# Parse domain for rate limiting | |
domain = urlparse(url).netloc | |
rate_limiter = self._get_rate_limiter(domain) | |
# Check if we can proceed with the request | |
if not rate_limiter.can_proceed(): | |
wait_time = rate_limiter.get_wait_time() | |
logger.warning(f"Rate limit reached for {domain}. Waiting {wait_time:.2f} seconds") | |
time.sleep(wait_time) | |
# Select appropriate scraper | |
if scraper_type and scraper_type in self.scrapers: | |
scraper = self.scrapers[scraper_type] | |
selected_type = scraper_type | |
else: | |
scraper, selected_type = self.get_scraper_for_url(url) | |
logger.info(f"Scraping {url} with {selected_type} scraper") | |
# Perform scraping and record the request | |
result = scraper.scrape(url) | |
rate_limiter.record_request() | |
# Add metadata about scraping | |
result["scraper_type"] = selected_type | |
result["scraped_at"] = time.time() | |
return result | |
except Exception as e: | |
logger.error(f"Error scraping URL {url}: {str(e)}") | |
return { | |
"success": False, | |
"url": url, | |
"error": str(e), | |
"scraped_at": time.time() | |
} | |
def scrape_multiple_urls(self, urls: List[str]) -> List[Dict[str, Any]]: | |
""" | |
Scrape multiple URLs in parallel | |
Args: | |
urls: List of URLs to scrape | |
Returns: | |
List of dictionaries with scraped data | |
""" | |
results = [] | |
# Use ThreadPoolExecutor for concurrent scraping | |
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
# Submit scraping tasks | |
future_to_url = {executor.submit(self.scrape_url, url): url for url in urls} | |
# Collect results as they complete | |
for future in as_completed(future_to_url): | |
url = future_to_url[future] | |
try: | |
result = future.result() | |
results.append(result) | |
logger.info(f"Completed scraping: {url}") | |
except Exception as e: | |
logger.error(f"Exception scraping {url}: {str(e)}") | |
results.append({ | |
"success": False, | |
"url": url, | |
"error": str(e), | |
"scraped_at": time.time() | |
}) | |
return results | |