import logging import json from typing import Dict, Any, List, Optional from datetime import datetime, date import time from concurrent.futures import ThreadPoolExecutor, as_completed from scrapers.horoscope_scraper import HoroscopeScraper from scrapers.astrology_com_scraper import AstrologyComScraper from scrapers.horoscope_com_scraper import HoroscopeComScraper from utils.rate_limiter import RateLimiter from models import db, Horoscope from app import app logger = logging.getLogger(__name__) class HoroscopeService: """Service to manage horoscope scraping operations""" def __init__(self, max_workers: int = 3): """ Initialize horoscope service Args: max_workers: Maximum number of concurrent scrapers """ self.max_workers = max_workers self.rate_limiters = {} # Domain-specific rate limiters # Register available scrapers self.scrapers = { "astrology.com": AstrologyComScraper(), "horoscope.com": HoroscopeComScraper(), } def _get_rate_limiter(self, domain: str) -> RateLimiter: """Get or create a rate limiter for a specific domain""" if domain not in self.rate_limiters: # Default: 5 requests per minute for each domain self.rate_limiters[domain] = RateLimiter(window_size=60, max_requests=5) return self.rate_limiters[domain] def scrape_sign(self, source: str, sign: str, date_str: Optional[str] = None) -> Dict[str, Any]: """ Scrape horoscope for a specific sign from a specific source Args: source: Source name (e.g., 'astrology.com') sign: Zodiac sign date_str: Optional date string (YYYY-MM-DD) Returns: Dictionary with horoscope data """ if source not in self.scrapers: return {"success": False, "error": f"Unknown source: {source}"} scraper = self.scrapers[source] base_url = scraper.base_url # Apply rate limiting rate_limiter = self._get_rate_limiter(source) if not rate_limiter.can_proceed(): wait_time = rate_limiter.get_wait_time() logger.warning(f"Rate limit reached for {source}. Waiting {wait_time:.2f} seconds") time.sleep(wait_time) # Perform scraping result = scraper.scrape_sign(base_url, sign, date_str) rate_limiter.record_request() # Save to database if successful if result.get('success', False): self._save_to_database(result, source, sign, date_str) return result def scrape_all_signs(self, source: str, date_str: Optional[str] = None) -> List[Dict[str, Any]]: """ Scrape horoscopes for all zodiac signs from a specific source Args: source: Source name (e.g., 'astrology.com') date_str: Optional date string (YYYY-MM-DD) Returns: List of dictionaries with horoscope data """ if source not in self.scrapers: return [{"success": False, "error": f"Unknown source: {source}"}] scraper = self.scrapers[source] zodiac_signs = scraper.ZODIAC_SIGNS results = [] # Use ThreadPoolExecutor for concurrent scraping with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit scraping tasks future_to_sign = { executor.submit(self.scrape_sign, source, sign, date_str): sign for sign in zodiac_signs } # Collect results as they complete for future in as_completed(future_to_sign): sign = future_to_sign[future] try: result = future.result() results.append(result) logger.info(f"Completed scraping {sign} horoscope from {source}") except Exception as e: logger.error(f"Exception scraping {sign} from {source}: {str(e)}") results.append({ "success": False, "sign": sign, "source": source, "error": str(e), "scraped_at": time.time() }) return results def scrape_sign_from_all_sources(self, sign: str, date_str: Optional[str] = None) -> List[Dict[str, Any]]: """ Scrape horoscope for a specific sign from all available sources Args: sign: Zodiac sign date_str: Optional date string (YYYY-MM-DD) Returns: List of dictionaries with horoscope data """ results = [] # Use ThreadPoolExecutor for concurrent scraping with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit scraping tasks future_to_source = { executor.submit(self.scrape_sign, source, sign, date_str): source for source in self.scrapers.keys() } # Collect results as they complete for future in as_completed(future_to_source): source = future_to_source[future] try: result = future.result() results.append(result) logger.info(f"Completed scraping {sign} horoscope from {source}") except Exception as e: logger.error(f"Exception scraping {sign} from {source}: {str(e)}") results.append({ "success": False, "sign": sign, "source": source, "error": str(e), "scraped_at": time.time() }) return results def scrape_all_horoscopes(self, date_str: Optional[str] = None) -> List[Dict[str, Any]]: """ Scrape horoscopes for all signs from all sources Args: date_str: Optional date string (YYYY-MM-DD) Returns: List of dictionaries with horoscope data """ all_results = [] for source in self.scrapers.keys(): results = self.scrape_all_signs(source, date_str) all_results.extend(results) return all_results def _save_to_database(self, result: Dict[str, Any], source: str, sign: str, date_str: Optional[str] = None) -> None: """Save horoscope data to database""" try: # Extract data from result prediction = result.get('prediction', '') # Parse date if date_str: horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() else: # Use the date from the scraper or today horoscope_date = datetime.strptime(result.get('date', date.today().isoformat()), '%Y-%m-%d').date() with app.app_context(): # Check if horoscope already exists for this sign, date, and source existing = Horoscope.query.filter_by( sign=sign.lower(), date=horoscope_date, source=source ).first() if existing: # Update existing horoscope existing.prediction = prediction db.session.commit() logger.info(f"Updated horoscope for {sign} on {horoscope_date} from {source}") else: # Create new horoscope horoscope = Horoscope() horoscope.sign = sign.lower() horoscope.date = horoscope_date horoscope.prediction = prediction horoscope.source = source db.session.add(horoscope) db.session.commit() logger.info(f"Added horoscope for {sign} on {horoscope_date} from {source}") except Exception as e: logger.error(f"Error saving horoscope to database: {str(e)}") def get_horoscope(self, sign: str, date_str: Optional[str] = None, source: Optional[str] = None) -> Dict[str, Any]: """ Retrieve horoscope from database Args: sign: Zodiac sign date_str: Optional date string (YYYY-MM-DD) source: Optional source name Returns: Dictionary with horoscope data """ try: # Parse date if date_str: horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() else: horoscope_date = date.today() with app.app_context(): query = Horoscope.query.filter_by( sign=sign.lower(), date=horoscope_date ) if source: query = query.filter_by(source=source) horoscopes = query.all() if not horoscopes: # If no horoscope found, try to scrape it if source: self.scrape_sign(source, sign, date_str) # Try to fetch again horoscope = Horoscope.query.filter_by( sign=sign.lower(), date=horoscope_date, source=source ).first() if horoscope: return horoscope.to_dict() else: # Try all sources self.scrape_sign_from_all_sources(sign, date_str) # Try to fetch again horoscopes = Horoscope.query.filter_by( sign=sign.lower(), date=horoscope_date ).all() if not horoscopes: return {"error": f"No horoscope found for {sign} on {horoscope_date}"} # If multiple horoscopes found, return them all if len(horoscopes) > 1: return {"horoscopes": [h.to_dict() for h in horoscopes]} else: return horoscopes[0].to_dict() except Exception as e: logger.error(f"Error getting horoscope from database: {str(e)}") return {"error": str(e)} def get_horoscopes_for_date(self, date_str: Optional[str] = None) -> Dict[str, Any]: """ Retrieve all horoscopes for a specific date Args: date_str: Optional date string (YYYY-MM-DD) Returns: Dictionary with horoscope data for all signs """ try: # Parse date if date_str: horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() else: horoscope_date = date.today() with app.app_context(): horoscopes = Horoscope.query.filter_by(date=horoscope_date).all() if not horoscopes: # If no horoscopes found, try to scrape them self.scrape_all_horoscopes(date_str) # Try to fetch again horoscopes = Horoscope.query.filter_by(date=horoscope_date).all() if not horoscopes: return {"error": f"No horoscopes found for {horoscope_date}"} # Group by sign result = {} for horoscope in horoscopes: sign = horoscope.sign if sign not in result: result[sign] = [] result[sign].append(horoscope.to_dict()) return {"date": horoscope_date.isoformat(), "horoscopes": result} except Exception as e: logger.error(f"Error getting horoscopes for date: {str(e)}") return {"error": str(e)} # Create a singleton instance horoscope_service = HoroscopeService()