Spaces:
Running
Running
File size: 5,769 Bytes
2c72e40 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
import logging
import time
from typing import Dict, Any, List, Optional, Tuple, Type
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
from scrapers.base_scraper import BaseScraper
from scrapers.news_scraper import NewsScraper
from scrapers.blog_scraper import BlogScraper
from utils.rate_limiter import RateLimiter
logger = logging.getLogger(__name__)
class ScraperService:
"""Service to manage scraping operations"""
def __init__(self, max_workers: int = 5, timeout: int = 30):
"""
Initialize scraper service
Args:
max_workers: Maximum number of concurrent scrapers
timeout: Timeout for each scraping operation in seconds
"""
self.max_workers = max_workers
self.timeout = timeout
self.rate_limiters = {} # Domain-specific rate limiters
# Register available scrapers
self.scrapers = {
"news": NewsScraper(timeout),
"blog": BlogScraper(timeout),
}
def get_scraper_for_url(self, url: str) -> Tuple[BaseScraper, str]:
"""
Determine the appropriate scraper to use for a URL
Args:
url: URL to scrape
Returns:
Tuple of (scraper instance, scraper type)
"""
# Simple logic to determine scraper type based on URL patterns
# This could be enhanced with more sophisticated detection
domain = urlparse(url).netloc.lower()
# News site patterns
news_patterns = ["news", "cnn", "bbc", "reuters", "nytimes", "washingtonpost",
"guardian", "aljazeera", "foxnews", "nbcnews", "abc"]
# Blog patterns
blog_patterns = ["blog", "medium", "wordpress", "blogspot", "tumblr",
"substack", "ghost", "hashnode"]
# Check domain against patterns
for pattern in news_patterns:
if pattern in domain:
return self.scrapers["news"], "news"
for pattern in blog_patterns:
if pattern in domain:
return self.scrapers["blog"], "blog"
# Default to news scraper
return self.scrapers["news"], "news"
def _get_rate_limiter(self, domain: str) -> RateLimiter:
"""Get or create a rate limiter for a specific domain"""
if domain not in self.rate_limiters:
# Default: 5 requests per minute for each domain
self.rate_limiters[domain] = RateLimiter(window_size=60, max_requests=5)
return self.rate_limiters[domain]
def scrape_url(self, url: str, scraper_type: Optional[str] = None) -> Dict[str, Any]:
"""
Scrape a single URL
Args:
url: URL to scrape
scraper_type: Optional type of scraper to use
Returns:
Dictionary with scraped data
"""
try:
# Parse domain for rate limiting
domain = urlparse(url).netloc
rate_limiter = self._get_rate_limiter(domain)
# Check if we can proceed with the request
if not rate_limiter.can_proceed():
wait_time = rate_limiter.get_wait_time()
logger.warning(f"Rate limit reached for {domain}. Waiting {wait_time:.2f} seconds")
time.sleep(wait_time)
# Select appropriate scraper
if scraper_type and scraper_type in self.scrapers:
scraper = self.scrapers[scraper_type]
selected_type = scraper_type
else:
scraper, selected_type = self.get_scraper_for_url(url)
logger.info(f"Scraping {url} with {selected_type} scraper")
# Perform scraping and record the request
result = scraper.scrape(url)
rate_limiter.record_request()
# Add metadata about scraping
result["scraper_type"] = selected_type
result["scraped_at"] = time.time()
return result
except Exception as e:
logger.error(f"Error scraping URL {url}: {str(e)}")
return {
"success": False,
"url": url,
"error": str(e),
"scraped_at": time.time()
}
def scrape_multiple_urls(self, urls: List[str]) -> List[Dict[str, Any]]:
"""
Scrape multiple URLs in parallel
Args:
urls: List of URLs to scrape
Returns:
List of dictionaries with scraped data
"""
results = []
# Use ThreadPoolExecutor for concurrent scraping
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit scraping tasks
future_to_url = {executor.submit(self.scrape_url, url): url for url in urls}
# Collect results as they complete
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
logger.info(f"Completed scraping: {url}")
except Exception as e:
logger.error(f"Exception scraping {url}: {str(e)}")
results.append({
"success": False,
"url": url,
"error": str(e),
"scraped_at": time.time()
})
return results
|