Spaces:
Running
Running
import logging | |
import json | |
from typing import Dict, Any, List, Optional | |
from datetime import datetime, date | |
import time | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from scrapers.horoscope_scraper import HoroscopeScraper | |
from scrapers.astrology_com_scraper import AstrologyComScraper | |
from scrapers.horoscope_com_scraper import HoroscopeComScraper | |
from utils.rate_limiter import RateLimiter | |
from models import db, Horoscope | |
from app import app | |
logger = logging.getLogger(__name__) | |
class HoroscopeService: | |
"""Service to manage horoscope scraping operations""" | |
def __init__(self, max_workers: int = 3): | |
""" | |
Initialize horoscope service | |
Args: | |
max_workers: Maximum number of concurrent scrapers | |
""" | |
self.max_workers = max_workers | |
self.rate_limiters = {} # Domain-specific rate limiters | |
# Register available scrapers | |
self.scrapers = { | |
"astrology.com": AstrologyComScraper(), | |
"horoscope.com": HoroscopeComScraper(), | |
} | |
def _get_rate_limiter(self, domain: str) -> RateLimiter: | |
"""Get or create a rate limiter for a specific domain""" | |
if domain not in self.rate_limiters: | |
# Default: 5 requests per minute for each domain | |
self.rate_limiters[domain] = RateLimiter(window_size=60, max_requests=5) | |
return self.rate_limiters[domain] | |
def scrape_sign(self, source: str, sign: str, date_str: Optional[str] = None) -> Dict[str, Any]: | |
""" | |
Scrape horoscope for a specific sign from a specific source | |
Args: | |
source: Source name (e.g., 'astrology.com') | |
sign: Zodiac sign | |
date_str: Optional date string (YYYY-MM-DD) | |
Returns: | |
Dictionary with horoscope data | |
""" | |
if source not in self.scrapers: | |
return {"success": False, "error": f"Unknown source: {source}"} | |
scraper = self.scrapers[source] | |
base_url = scraper.base_url | |
# Apply rate limiting | |
rate_limiter = self._get_rate_limiter(source) | |
if not rate_limiter.can_proceed(): | |
wait_time = rate_limiter.get_wait_time() | |
logger.warning(f"Rate limit reached for {source}. Waiting {wait_time:.2f} seconds") | |
time.sleep(wait_time) | |
# Perform scraping | |
result = scraper.scrape_sign(base_url, sign, date_str) | |
rate_limiter.record_request() | |
# Save to database if successful | |
if result.get('success', False): | |
self._save_to_database(result, source, sign, date_str) | |
return result | |
def scrape_all_signs(self, source: str, date_str: Optional[str] = None) -> List[Dict[str, Any]]: | |
""" | |
Scrape horoscopes for all zodiac signs from a specific source | |
Args: | |
source: Source name (e.g., 'astrology.com') | |
date_str: Optional date string (YYYY-MM-DD) | |
Returns: | |
List of dictionaries with horoscope data | |
""" | |
if source not in self.scrapers: | |
return [{"success": False, "error": f"Unknown source: {source}"}] | |
scraper = self.scrapers[source] | |
zodiac_signs = scraper.ZODIAC_SIGNS | |
results = [] | |
# Use ThreadPoolExecutor for concurrent scraping | |
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
# Submit scraping tasks | |
future_to_sign = { | |
executor.submit(self.scrape_sign, source, sign, date_str): sign | |
for sign in zodiac_signs | |
} | |
# Collect results as they complete | |
for future in as_completed(future_to_sign): | |
sign = future_to_sign[future] | |
try: | |
result = future.result() | |
results.append(result) | |
logger.info(f"Completed scraping {sign} horoscope from {source}") | |
except Exception as e: | |
logger.error(f"Exception scraping {sign} from {source}: {str(e)}") | |
results.append({ | |
"success": False, | |
"sign": sign, | |
"source": source, | |
"error": str(e), | |
"scraped_at": time.time() | |
}) | |
return results | |
def scrape_sign_from_all_sources(self, sign: str, date_str: Optional[str] = None) -> List[Dict[str, Any]]: | |
""" | |
Scrape horoscope for a specific sign from all available sources | |
Args: | |
sign: Zodiac sign | |
date_str: Optional date string (YYYY-MM-DD) | |
Returns: | |
List of dictionaries with horoscope data | |
""" | |
results = [] | |
# Use ThreadPoolExecutor for concurrent scraping | |
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
# Submit scraping tasks | |
future_to_source = { | |
executor.submit(self.scrape_sign, source, sign, date_str): source | |
for source in self.scrapers.keys() | |
} | |
# Collect results as they complete | |
for future in as_completed(future_to_source): | |
source = future_to_source[future] | |
try: | |
result = future.result() | |
results.append(result) | |
logger.info(f"Completed scraping {sign} horoscope from {source}") | |
except Exception as e: | |
logger.error(f"Exception scraping {sign} from {source}: {str(e)}") | |
results.append({ | |
"success": False, | |
"sign": sign, | |
"source": source, | |
"error": str(e), | |
"scraped_at": time.time() | |
}) | |
return results | |
def scrape_all_horoscopes(self, date_str: Optional[str] = None) -> List[Dict[str, Any]]: | |
""" | |
Scrape horoscopes for all signs from all sources | |
Args: | |
date_str: Optional date string (YYYY-MM-DD) | |
Returns: | |
List of dictionaries with horoscope data | |
""" | |
all_results = [] | |
for source in self.scrapers.keys(): | |
results = self.scrape_all_signs(source, date_str) | |
all_results.extend(results) | |
return all_results | |
def _save_to_database(self, result: Dict[str, Any], source: str, sign: str, date_str: Optional[str] = None) -> None: | |
"""Save horoscope data to database""" | |
try: | |
# Extract data from result | |
prediction = result.get('prediction', '') | |
# Parse date | |
if date_str: | |
horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
else: | |
# Use the date from the scraper or today | |
horoscope_date = datetime.strptime(result.get('date', date.today().isoformat()), '%Y-%m-%d').date() | |
with app.app_context(): | |
# Check if horoscope already exists for this sign, date, and source | |
existing = Horoscope.query.filter_by( | |
sign=sign.lower(), | |
date=horoscope_date, | |
source=source | |
).first() | |
if existing: | |
# Update existing horoscope | |
existing.prediction = prediction | |
db.session.commit() | |
logger.info(f"Updated horoscope for {sign} on {horoscope_date} from {source}") | |
else: | |
# Create new horoscope | |
horoscope = Horoscope() | |
horoscope.sign = sign.lower() | |
horoscope.date = horoscope_date | |
horoscope.prediction = prediction | |
horoscope.source = source | |
db.session.add(horoscope) | |
db.session.commit() | |
logger.info(f"Added horoscope for {sign} on {horoscope_date} from {source}") | |
except Exception as e: | |
logger.error(f"Error saving horoscope to database: {str(e)}") | |
def get_horoscope(self, sign: str, date_str: Optional[str] = None, source: Optional[str] = None) -> Dict[str, Any]: | |
""" | |
Retrieve horoscope from database | |
Args: | |
sign: Zodiac sign | |
date_str: Optional date string (YYYY-MM-DD) | |
source: Optional source name | |
Returns: | |
Dictionary with horoscope data | |
""" | |
try: | |
# Parse date | |
if date_str: | |
horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
else: | |
horoscope_date = date.today() | |
with app.app_context(): | |
query = Horoscope.query.filter_by( | |
sign=sign.lower(), | |
date=horoscope_date | |
) | |
if source: | |
query = query.filter_by(source=source) | |
horoscopes = query.all() | |
if not horoscopes: | |
# If no horoscope found, try to scrape it | |
if source: | |
self.scrape_sign(source, sign, date_str) | |
# Try to fetch again | |
horoscope = Horoscope.query.filter_by( | |
sign=sign.lower(), | |
date=horoscope_date, | |
source=source | |
).first() | |
if horoscope: | |
return horoscope.to_dict() | |
else: | |
# Try all sources | |
self.scrape_sign_from_all_sources(sign, date_str) | |
# Try to fetch again | |
horoscopes = Horoscope.query.filter_by( | |
sign=sign.lower(), | |
date=horoscope_date | |
).all() | |
if not horoscopes: | |
return {"error": f"No horoscope found for {sign} on {horoscope_date}"} | |
# If multiple horoscopes found, return them all | |
if len(horoscopes) > 1: | |
return {"horoscopes": [h.to_dict() for h in horoscopes]} | |
else: | |
return horoscopes[0].to_dict() | |
except Exception as e: | |
logger.error(f"Error getting horoscope from database: {str(e)}") | |
return {"error": str(e)} | |
def get_horoscopes_for_date(self, date_str: Optional[str] = None) -> Dict[str, Any]: | |
""" | |
Retrieve all horoscopes for a specific date | |
Args: | |
date_str: Optional date string (YYYY-MM-DD) | |
Returns: | |
Dictionary with horoscope data for all signs | |
""" | |
try: | |
# Parse date | |
if date_str: | |
horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
else: | |
horoscope_date = date.today() | |
with app.app_context(): | |
horoscopes = Horoscope.query.filter_by(date=horoscope_date).all() | |
if not horoscopes: | |
# If no horoscopes found, try to scrape them | |
self.scrape_all_horoscopes(date_str) | |
# Try to fetch again | |
horoscopes = Horoscope.query.filter_by(date=horoscope_date).all() | |
if not horoscopes: | |
return {"error": f"No horoscopes found for {horoscope_date}"} | |
# Group by sign | |
result = {} | |
for horoscope in horoscopes: | |
sign = horoscope.sign | |
if sign not in result: | |
result[sign] = [] | |
result[sign].append(horoscope.to_dict()) | |
return {"date": horoscope_date.isoformat(), "horoscopes": result} | |
except Exception as e: | |
logger.error(f"Error getting horoscopes for date: {str(e)}") | |
return {"error": str(e)} | |
# Create a singleton instance | |
horoscope_service = HoroscopeService() |