MT564AITraining / services /scraper_service.py
pareshmishra
Add full project source files for MT564 AI
2c72e40
import logging
import time
from typing import Dict, Any, List, Optional, Tuple, Type
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
from scrapers.base_scraper import BaseScraper
from scrapers.news_scraper import NewsScraper
from scrapers.blog_scraper import BlogScraper
from utils.rate_limiter import RateLimiter
logger = logging.getLogger(__name__)
class ScraperService:
"""Service to manage scraping operations"""
def __init__(self, max_workers: int = 5, timeout: int = 30):
"""
Initialize scraper service
Args:
max_workers: Maximum number of concurrent scrapers
timeout: Timeout for each scraping operation in seconds
"""
self.max_workers = max_workers
self.timeout = timeout
self.rate_limiters = {} # Domain-specific rate limiters
# Register available scrapers
self.scrapers = {
"news": NewsScraper(timeout),
"blog": BlogScraper(timeout),
}
def get_scraper_for_url(self, url: str) -> Tuple[BaseScraper, str]:
"""
Determine the appropriate scraper to use for a URL
Args:
url: URL to scrape
Returns:
Tuple of (scraper instance, scraper type)
"""
# Simple logic to determine scraper type based on URL patterns
# This could be enhanced with more sophisticated detection
domain = urlparse(url).netloc.lower()
# News site patterns
news_patterns = ["news", "cnn", "bbc", "reuters", "nytimes", "washingtonpost",
"guardian", "aljazeera", "foxnews", "nbcnews", "abc"]
# Blog patterns
blog_patterns = ["blog", "medium", "wordpress", "blogspot", "tumblr",
"substack", "ghost", "hashnode"]
# Check domain against patterns
for pattern in news_patterns:
if pattern in domain:
return self.scrapers["news"], "news"
for pattern in blog_patterns:
if pattern in domain:
return self.scrapers["blog"], "blog"
# Default to news scraper
return self.scrapers["news"], "news"
def _get_rate_limiter(self, domain: str) -> RateLimiter:
"""Get or create a rate limiter for a specific domain"""
if domain not in self.rate_limiters:
# Default: 5 requests per minute for each domain
self.rate_limiters[domain] = RateLimiter(window_size=60, max_requests=5)
return self.rate_limiters[domain]
def scrape_url(self, url: str, scraper_type: Optional[str] = None) -> Dict[str, Any]:
"""
Scrape a single URL
Args:
url: URL to scrape
scraper_type: Optional type of scraper to use
Returns:
Dictionary with scraped data
"""
try:
# Parse domain for rate limiting
domain = urlparse(url).netloc
rate_limiter = self._get_rate_limiter(domain)
# Check if we can proceed with the request
if not rate_limiter.can_proceed():
wait_time = rate_limiter.get_wait_time()
logger.warning(f"Rate limit reached for {domain}. Waiting {wait_time:.2f} seconds")
time.sleep(wait_time)
# Select appropriate scraper
if scraper_type and scraper_type in self.scrapers:
scraper = self.scrapers[scraper_type]
selected_type = scraper_type
else:
scraper, selected_type = self.get_scraper_for_url(url)
logger.info(f"Scraping {url} with {selected_type} scraper")
# Perform scraping and record the request
result = scraper.scrape(url)
rate_limiter.record_request()
# Add metadata about scraping
result["scraper_type"] = selected_type
result["scraped_at"] = time.time()
return result
except Exception as e:
logger.error(f"Error scraping URL {url}: {str(e)}")
return {
"success": False,
"url": url,
"error": str(e),
"scraped_at": time.time()
}
def scrape_multiple_urls(self, urls: List[str]) -> List[Dict[str, Any]]:
"""
Scrape multiple URLs in parallel
Args:
urls: List of URLs to scrape
Returns:
List of dictionaries with scraped data
"""
results = []
# Use ThreadPoolExecutor for concurrent scraping
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit scraping tasks
future_to_url = {executor.submit(self.scrape_url, url): url for url in urls}
# Collect results as they complete
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
logger.info(f"Completed scraping: {url}")
except Exception as e:
logger.error(f"Exception scraping {url}: {str(e)}")
results.append({
"success": False,
"url": url,
"error": str(e),
"scraped_at": time.time()
})
return results