|
import re |
|
import time |
|
import json |
|
import random |
|
import hashlib |
|
import logging |
|
import requests |
|
import pandas as pd |
|
from pathlib import Path |
|
from newspaper import Article, build |
|
from datetime import datetime, timedelta |
|
from urllib.parse import urljoin, urlparse |
|
from typing import List, Dict, Optional, Tuple |
|
from data.validation_schemas import ValidationLevel |
|
from data.data_validator import DataValidationPipeline |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler('/tmp/scraping.log'), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
class RobustNewsScraper: |
|
"""Production-ready news scraper with comprehensive error handling and rate limiting""" |
|
|
|
def __init__(self): |
|
self.setup_paths() |
|
self.setup_scraping_config() |
|
self.session = self.create_session() |
|
self.scraped_urls = self.load_scraped_urls() |
|
|
|
def setup_paths(self): |
|
"""Setup all necessary paths""" |
|
self.base_dir = Path("/tmp") |
|
self.data_dir = self.base_dir / "data" |
|
self.data_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
self.output_path = self.data_dir / "scraped_real.csv" |
|
self.metadata_path = self.data_dir / "scraping_metadata.json" |
|
self.urls_cache_path = self.data_dir / "scraped_urls.json" |
|
|
|
def setup_scraping_config(self): |
|
"""Setup scraping configuration""" |
|
self.news_sites = [ |
|
{ |
|
"name": "Reuters", |
|
"url": "https://www.reuters.com/", |
|
"max_articles": 8, |
|
"delay": 2.0 |
|
}, |
|
{ |
|
"name": "BBC", |
|
"url": "https://www.bbc.com/news", |
|
"max_articles": 7, |
|
"delay": 2.5 |
|
}, |
|
{ |
|
"name": "NPR", |
|
"url": "https://www.npr.org/", |
|
"max_articles": 5, |
|
"delay": 3.0 |
|
}, |
|
{ |
|
"name": "Associated Press", |
|
"url": "https://apnews.com/", |
|
"max_articles": 5, |
|
"delay": 2.0 |
|
} |
|
] |
|
|
|
self.max_articles_total = 20 |
|
self.min_article_length = 100 |
|
self.max_article_length = 10000 |
|
self.scraping_timeout = 30 |
|
self.max_retries = 3 |
|
|
|
def create_session(self) -> requests.Session: |
|
"""Create configured requests session""" |
|
session = requests.Session() |
|
session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
'Accept-Encoding': 'gzip, deflate', |
|
'Connection': 'keep-alive', |
|
'Upgrade-Insecure-Requests': '1', |
|
}) |
|
return session |
|
|
|
def load_scraped_urls(self) -> set: |
|
"""Load previously scraped URLs to avoid duplicates""" |
|
if self.urls_cache_path.exists(): |
|
try: |
|
with open(self.urls_cache_path, 'r') as f: |
|
urls_data = json.load(f) |
|
|
|
cutoff_date = datetime.now() - timedelta(days=30) |
|
recent_urls = { |
|
url for url, timestamp in urls_data.items() |
|
if datetime.fromisoformat(timestamp) > cutoff_date |
|
} |
|
logger.info(f"Loaded {len(recent_urls)} recent URLs from cache") |
|
return recent_urls |
|
except Exception as e: |
|
logger.warning(f"Failed to load URL cache: {e}") |
|
return set() |
|
|
|
def save_scraped_urls(self, new_urls: Dict[str, str]): |
|
"""Save scraped URLs with timestamps""" |
|
try: |
|
|
|
urls_data = {} |
|
if self.urls_cache_path.exists(): |
|
with open(self.urls_cache_path, 'r') as f: |
|
urls_data = json.load(f) |
|
|
|
|
|
urls_data.update(new_urls) |
|
|
|
|
|
with open(self.urls_cache_path, 'w') as f: |
|
json.dump(urls_data, f, indent=2) |
|
|
|
logger.info(f"Saved {len(new_urls)} new URLs to cache") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to save URL cache: {e}") |
|
|
|
def validate_article_quality(self, article: Article) -> Tuple[bool, str]: |
|
"""Validate article quality with comprehensive checks""" |
|
|
|
if not article.text or len(article.text.strip()) < self.min_article_length: |
|
return False, "Article too short" |
|
|
|
if len(article.text) > self.max_article_length: |
|
return False, "Article too long" |
|
|
|
|
|
if not article.title or len(article.title.strip()) < 10: |
|
return False, "Missing or inadequate title" |
|
|
|
|
|
if not any(c.isalpha() for c in article.text): |
|
return False, "No alphabetic content" |
|
|
|
|
|
if not any(punct in article.text for punct in '.!?'): |
|
return False, "No sentence structure" |
|
|
|
|
|
html_patterns = [ |
|
r'<[^>]+>', |
|
r'&[a-zA-Z]+;', |
|
r'javascript:', |
|
r'document\.', |
|
r'window\.' |
|
] |
|
|
|
for pattern in html_patterns: |
|
if len(re.findall(pattern, article.text)) > 5: |
|
return False, "Excessive HTML artifacts" |
|
|
|
|
|
ad_keywords = [ |
|
'advertisement', 'sponsored', 'click here', 'buy now', |
|
'subscribe', 'newsletter', 'cookies', 'privacy policy' |
|
] |
|
|
|
text_lower = article.text.lower() |
|
ad_count = sum(1 for keyword in ad_keywords if keyword in text_lower) |
|
if ad_count > 3: |
|
return False, "Excessive advertising content" |
|
|
|
return True, "Article passed validation" |
|
|
|
def clean_article_text(self, text: str) -> str: |
|
"""Clean and normalize article text""" |
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
text = re.sub(r'&[a-zA-Z]+;', '', text) |
|
|
|
|
|
text = re.sub(r'[!]{2,}', '!', text) |
|
text = re.sub(r'[?]{2,}', '?', text) |
|
text = re.sub(r'[.]{3,}', '...', text) |
|
|
|
|
|
text = ''.join(char for char in text if ord(char) >= 32) |
|
|
|
return text.strip() |
|
|
|
def scrape_single_article(self, url: str) -> Optional[Dict]: |
|
"""Scrape a single article with comprehensive error handling""" |
|
try: |
|
|
|
if url in self.scraped_urls: |
|
return None |
|
|
|
|
|
article = Article(url) |
|
|
|
|
|
article.download() |
|
|
|
|
|
article.parse() |
|
|
|
|
|
is_valid, reason = self.validate_article_quality(article) |
|
if not is_valid: |
|
logger.debug(f"Article validation failed ({reason}): {url}") |
|
return None |
|
|
|
|
|
clean_title = self.clean_article_text(article.title) |
|
clean_text = self.clean_article_text(article.text) |
|
|
|
|
|
full_text = f"{clean_title}. {clean_text}" |
|
|
|
|
|
article_data = { |
|
'text': full_text, |
|
'label': 0, |
|
'source': urlparse(url).netloc, |
|
'url': url, |
|
'title': clean_title, |
|
'timestamp': datetime.now().isoformat(), |
|
'word_count': len(full_text.split()), |
|
'char_count': len(full_text) |
|
} |
|
|
|
logger.info(f"Successfully scraped article: {clean_title[:50]}...") |
|
return article_data |
|
|
|
except Exception as e: |
|
logger.warning(f"Failed to scrape {url}: {str(e)}") |
|
return None |
|
|
|
def scrape_site_articles(self, site_config: Dict) -> List[Dict]: |
|
"""Scrape articles from a single news site""" |
|
logger.info(f"Starting scraping from {site_config['name']}...") |
|
|
|
articles = [] |
|
scraped_urls = {} |
|
|
|
try: |
|
|
|
paper = build(site_config['url'], memoize_articles=False) |
|
|
|
|
|
article_urls = [article.url for article in paper.articles] |
|
|
|
|
|
new_urls = [url for url in article_urls if url not in self.scraped_urls] |
|
|
|
|
|
random.shuffle(new_urls) |
|
|
|
|
|
urls_to_scrape = new_urls[:site_config['max_articles']] |
|
|
|
logger.info(f"Found {len(urls_to_scrape)} new articles to scrape from {site_config['name']}") |
|
|
|
|
|
for i, url in enumerate(urls_to_scrape): |
|
if len(articles) >= site_config['max_articles']: |
|
break |
|
|
|
article_data = self.scrape_single_article(url) |
|
|
|
if article_data: |
|
articles.append(article_data) |
|
scraped_urls[url] = datetime.now().isoformat() |
|
|
|
|
|
if i < len(urls_to_scrape) - 1: |
|
time.sleep(site_config['delay']) |
|
|
|
|
|
if scraped_urls: |
|
self.save_scraped_urls(scraped_urls) |
|
|
|
logger.info(f"Successfully scraped {len(articles)} articles from {site_config['name']}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error scraping {site_config['name']}: {str(e)}") |
|
|
|
return articles |
|
|
|
def scrape_all_sources(self) -> List[Dict]: |
|
"""Scrape articles from all configured sources""" |
|
logger.info("Starting comprehensive news scraping...") |
|
|
|
all_articles = [] |
|
|
|
|
|
for site_config in self.news_sites: |
|
if len(all_articles) >= self.max_articles_total: |
|
break |
|
|
|
try: |
|
site_articles = self.scrape_site_articles(site_config) |
|
all_articles.extend(site_articles) |
|
|
|
|
|
if site_config != self.news_sites[-1]: |
|
time.sleep(1.0) |
|
|
|
except Exception as e: |
|
logger.error(f"Error scraping {site_config['name']}: {str(e)}") |
|
continue |
|
|
|
|
|
all_articles = all_articles[:self.max_articles_total] |
|
|
|
logger.info(f"Scraping complete. Total articles: {len(all_articles)}") |
|
return all_articles |
|
|
|
def save_scraped_articles(self, articles: List[Dict]) -> bool: |
|
"""Save scraped articles with validation""" |
|
try: |
|
if not articles: |
|
return True |
|
|
|
|
|
valid_articles, validation_summary = self.validate_scraped_articles(articles) |
|
|
|
logger.info(f"Validation: {len(valid_articles)}/{len(articles)} articles passed validation") |
|
|
|
if not valid_articles: |
|
logger.warning("No valid articles to save after validation") |
|
return True |
|
|
|
|
|
df_new = pd.DataFrame(valid_articles) |
|
|
|
|
|
if self.output_path.exists(): |
|
df_existing = pd.read_csv(self.output_path) |
|
df_combined = pd.concat([df_existing, df_new], ignore_index=True) |
|
df_combined = df_combined.drop_duplicates(subset=['text'], keep='first') |
|
else: |
|
df_combined = df_new |
|
|
|
df_combined.to_csv(self.output_path, index=False) |
|
|
|
|
|
validation_report_path = self.data_dir / "scraping_validation_report.json" |
|
with open(validation_report_path, 'w') as f: |
|
json.dump(validation_summary, f, indent=2) |
|
|
|
logger.info(f"Saved {len(valid_articles)} validated articles to {self.output_path}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to save validated articles: {e}") |
|
return False |
|
|
|
def generate_scraping_metadata(self, articles: List[Dict]) -> Dict: |
|
"""Generate metadata about the scraping session""" |
|
if not articles: |
|
return {} |
|
|
|
df = pd.DataFrame(articles) |
|
|
|
metadata = { |
|
'scraping_timestamp': datetime.now().isoformat(), |
|
'articles_scraped': len(articles), |
|
'sources': df['source'].value_counts().to_dict(), |
|
'average_word_count': float(df['word_count'].mean()), |
|
'total_characters': int(df['char_count'].sum()), |
|
'scraping_duration': None, |
|
'quality_score': self.calculate_scraping_quality(df) |
|
} |
|
|
|
return metadata |
|
|
|
def calculate_scraping_quality(self, df: pd.DataFrame) -> float: |
|
"""Calculate quality score for scraped articles""" |
|
scores = [] |
|
|
|
|
|
source_diversity = df['source'].nunique() / len(self.news_sites) |
|
scores.append(source_diversity) |
|
|
|
|
|
word_counts = df['word_count'] |
|
length_score = 1.0 - (word_counts.std() / word_counts.mean()) |
|
scores.append(max(0, min(1, length_score))) |
|
|
|
|
|
freshness_score = 1.0 |
|
scores.append(freshness_score) |
|
|
|
return float(sum(scores) / len(scores)) |
|
|
|
def scrape_articles(self) -> Tuple[bool, str]: |
|
"""Main scraping function with comprehensive error handling""" |
|
start_time = time.time() |
|
|
|
try: |
|
logger.info("Starting news scraping process...") |
|
|
|
|
|
articles = self.scrape_all_sources() |
|
|
|
if not articles: |
|
logger.warning("No articles were scraped successfully") |
|
return False, "No articles scraped" |
|
|
|
|
|
if not self.save_scraped_articles(articles): |
|
return False, "Failed to save articles" |
|
|
|
|
|
metadata = self.generate_scraping_metadata(articles) |
|
metadata['scraping_duration'] = time.time() - start_time |
|
|
|
try: |
|
with open(self.metadata_path, 'w') as f: |
|
json.dump(metadata, f, indent=2) |
|
except Exception as e: |
|
logger.warning(f"Failed to save metadata: {e}") |
|
|
|
success_msg = f"Successfully scraped {len(articles)} articles" |
|
logger.info(success_msg) |
|
|
|
return True, success_msg |
|
|
|
except Exception as e: |
|
error_msg = f"Scraping process failed: {str(e)}" |
|
logger.error(error_msg) |
|
return False, error_msg |
|
|
|
def validate_scraped_articles(self, articles: List[Dict]) -> Tuple[List[Dict], Dict]: |
|
"""Validate scraped articles using validation schemas""" |
|
if not articles: |
|
return articles, {} |
|
|
|
validator = DataValidationPipeline() |
|
|
|
|
|
enhanced_articles = [] |
|
for article in articles: |
|
enhanced_article = article.copy() |
|
if 'source' not in enhanced_article: |
|
enhanced_article['source'] = 'scraped_real' |
|
if 'label' not in enhanced_article: |
|
enhanced_article['label'] = 0 |
|
enhanced_articles.append(enhanced_article) |
|
|
|
|
|
validation_result = validator.validate_scraped_data(enhanced_articles, "web_scraping") |
|
|
|
|
|
valid_articles = [] |
|
for i, result in enumerate(validation_result.validation_results): |
|
if result.is_valid: |
|
article = enhanced_articles[i].copy() |
|
article['validation_quality_score'] = result.quality_metrics.get('overall_quality_score', 0.0) |
|
valid_articles.append(article) |
|
|
|
validation_summary = { |
|
'original_count': len(articles), |
|
'valid_count': len(valid_articles), |
|
'success_rate': validation_result.success_rate, |
|
'overall_quality_score': validation_result.overall_quality_score |
|
} |
|
|
|
return valid_articles, validation_summary |
|
|
|
|
|
def scrape_articles(): |
|
"""Main function for external calls""" |
|
scraper = RobustNewsScraper() |
|
success, message = scraper.scrape_articles() |
|
|
|
if success: |
|
print(f"β
{message}") |
|
else: |
|
print(f"β {message}") |
|
|
|
return success |
|
|
|
def main(): |
|
"""Main execution function""" |
|
scraper = RobustNewsScraper() |
|
success, message = scraper.scrape_articles() |
|
|
|
if success: |
|
print(f"β
{message}") |
|
else: |
|
print(f"β {message}") |
|
exit(1) |
|
|
|
if __name__ == "__main__": |
|
main() |