Spaces:
Running
Running
import logging | |
import threading | |
import time | |
import schedule | |
from datetime import datetime, timedelta | |
from typing import Dict, Any, List, Callable, Optional | |
from app import app | |
from models import db, ScheduledJob | |
logger = logging.getLogger(__name__) | |
class SchedulerService: | |
"""Service for managing scheduled scraping jobs""" | |
def __init__(self): | |
"""Initialize the scheduler service""" | |
self.scheduler = schedule | |
self.running = False | |
self.thread = None | |
self.jobs = {} # Store job references by name | |
def start(self): | |
"""Start the scheduler in a background thread""" | |
if self.running: | |
logger.warning("Scheduler already running") | |
return | |
self.running = True | |
self.thread = threading.Thread(target=self._run_schedule) | |
self.thread.daemon = True | |
self.thread.start() | |
logger.info("Scheduler started") | |
# Load jobs from database | |
self._load_jobs_from_db() | |
def stop(self): | |
"""Stop the scheduler""" | |
if not self.running: | |
logger.warning("Scheduler not running") | |
return | |
self.running = False | |
if self.thread: | |
self.thread.join(timeout=5) | |
self.thread = None | |
# Clear all jobs | |
schedule.clear() | |
self.jobs = {} | |
logger.info("Scheduler stopped") | |
def _run_schedule(self): | |
"""Run the scheduler loop""" | |
while self.running: | |
self.scheduler.run_pending() | |
time.sleep(1) | |
def _load_jobs_from_db(self): | |
"""Load scheduled jobs from database""" | |
try: | |
with app.app_context(): | |
jobs = ScheduledJob.query.filter_by(enabled=True).all() | |
for job in jobs: | |
self.add_job( | |
job.name, | |
job.frequency, | |
None, # We'll map by name in add_job | |
update_db=False # Don't create duplicate DB entries | |
) | |
logger.info(f"Loaded {len(jobs)} scheduled jobs from database") | |
except Exception as e: | |
logger.error(f"Error loading jobs from database: {str(e)}") | |
def add_job(self, name: str, frequency: str, job_func: Optional[Callable] = None, update_db: bool = True) -> bool: | |
""" | |
Add a new scheduled job | |
Args: | |
name: Unique name for the job | |
frequency: Frequency string (e.g., 'daily', 'hourly', '30 minutes') | |
job_func: Function to call when job runs | |
update_db: Whether to update the database with the new job | |
Returns: | |
Success status | |
""" | |
if name in self.jobs: | |
logger.warning(f"Job '{name}' already exists") | |
return False | |
# Map job name to function if not provided | |
if job_func is None: | |
job_func = self._get_job_function(name) | |
if not job_func: | |
logger.error(f"No function mapped for job '{name}'") | |
return False | |
# Schedule job based on frequency | |
scheduled_job = self._schedule_by_frequency(name, frequency, job_func) | |
if not scheduled_job: | |
logger.error(f"Failed to schedule job '{name}' with frequency '{frequency}'") | |
return False | |
# Store job reference | |
self.jobs[name] = scheduled_job | |
# Update database | |
if update_db: | |
try: | |
with app.app_context(): | |
# Check if job already exists | |
existing = ScheduledJob.query.filter_by(name=name).first() | |
if existing: | |
existing.frequency = frequency | |
existing.enabled = True | |
db.session.commit() | |
else: | |
# Calculate next run time | |
next_run = self._calculate_next_run(frequency) | |
# Create new job | |
job = ScheduledJob() | |
job.name = name | |
job.frequency = frequency | |
job.next_run = next_run | |
job.enabled = True | |
db.session.add(job) | |
db.session.commit() | |
logger.info(f"Added job '{name}' to database") | |
except Exception as e: | |
logger.error(f"Error adding job to database: {str(e)}") | |
return False | |
logger.info(f"Added job '{name}' with frequency '{frequency}'") | |
return True | |
def remove_job(self, name: str) -> bool: | |
""" | |
Remove a scheduled job | |
Args: | |
name: Name of the job to remove | |
Returns: | |
Success status | |
""" | |
if name not in self.jobs: | |
logger.warning(f"Job '{name}' not found") | |
return False | |
# Cancel the job | |
scheduled_job = self.jobs[name] | |
self.scheduler.cancel_job(scheduled_job) | |
del self.jobs[name] | |
# Update database | |
try: | |
with app.app_context(): | |
job = ScheduledJob.query.filter_by(name=name).first() | |
if job: | |
job.enabled = False | |
db.session.commit() | |
logger.info(f"Disabled job '{name}' in database") | |
else: | |
logger.warning(f"Job '{name}' not found in database") | |
except Exception as e: | |
logger.error(f"Error removing job from database: {str(e)}") | |
return False | |
logger.info(f"Removed job '{name}'") | |
return True | |
def get_all_jobs(self) -> List[Dict[str, Any]]: | |
"""Get list of all scheduled jobs""" | |
job_list = [] | |
try: | |
with app.app_context(): | |
jobs = ScheduledJob.query.all() | |
for job in jobs: | |
job_info = job.to_dict() | |
job_info["active"] = job.name in self.jobs | |
job_list.append(job_info) | |
except Exception as e: | |
logger.error(f"Error getting jobs from database: {str(e)}") | |
return job_list | |
def update_job_status(self, name: str, success: bool) -> bool: | |
""" | |
Update job status after running | |
Args: | |
name: Name of the job | |
success: Whether the job ran successfully | |
Returns: | |
Success status | |
""" | |
try: | |
with app.app_context(): | |
job = ScheduledJob.query.filter_by(name=name).first() | |
if job: | |
job.last_run = datetime.utcnow() | |
job.next_run = self._calculate_next_run(job.frequency) | |
db.session.commit() | |
logger.info(f"Updated job '{name}' status") | |
return True | |
else: | |
logger.warning(f"Job '{name}' not found in database") | |
except Exception as e: | |
logger.error(f"Error updating job status: {str(e)}") | |
return False | |
def _schedule_by_frequency(self, name: str, frequency: str, job_func: Callable) -> Optional[schedule.Job]: | |
"""Schedule a job based on frequency string""" | |
# Create a wrapper function to update job status | |
def job_wrapper(): | |
try: | |
logger.info(f"Running scheduled job: {name}") | |
result = job_func() | |
self.update_job_status(name, True) | |
return result | |
except Exception as e: | |
logger.error(f"Error running job '{name}': {str(e)}") | |
self.update_job_status(name, False) | |
# Schedule based on frequency patterns | |
if frequency == 'daily': | |
return schedule.every().day.at("00:00").do(job_wrapper) | |
elif frequency == 'hourly': | |
return schedule.every().hour.do(job_wrapper) | |
elif 'minutes' in frequency or 'minute' in frequency: | |
# Extract number of minutes (e.g., '30 minutes' -> 30) | |
try: | |
minutes = int(frequency.split()[0]) | |
return schedule.every(minutes).minutes.do(job_wrapper) | |
except (ValueError, IndexError): | |
logger.error(f"Invalid minutes format: {frequency}") | |
return None | |
elif 'hours' in frequency or 'hour' in frequency: | |
# Extract number of hours (e.g., '6 hours' -> 6) | |
try: | |
hours = int(frequency.split()[0]) | |
return schedule.every(hours).hours.do(job_wrapper) | |
except (ValueError, IndexError): | |
logger.error(f"Invalid hours format: {frequency}") | |
return None | |
elif frequency == 'weekly': | |
return schedule.every().week.do(job_wrapper) | |
else: | |
logger.error(f"Unsupported frequency: {frequency}") | |
return None | |
def _calculate_next_run(self, frequency: str) -> datetime: | |
"""Calculate next run time based on frequency""" | |
now = datetime.utcnow() | |
if frequency == 'daily': | |
# Next run at midnight | |
next_day = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) | |
return next_day | |
elif frequency == 'hourly': | |
# Next run at the top of the next hour | |
next_hour = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) | |
return next_hour | |
elif 'minutes' in frequency or 'minute' in frequency: | |
try: | |
minutes = int(frequency.split()[0]) | |
return now + timedelta(minutes=minutes) | |
except (ValueError, IndexError): | |
return now + timedelta(minutes=30) # Default to 30 minutes | |
elif 'hours' in frequency or 'hour' in frequency: | |
try: | |
hours = int(frequency.split()[0]) | |
return now + timedelta(hours=hours) | |
except (ValueError, IndexError): | |
return now + timedelta(hours=1) # Default to 1 hour | |
elif frequency == 'weekly': | |
return now + timedelta(days=7) | |
else: | |
return now + timedelta(days=1) # Default to daily | |
def _get_job_function(self, name: str) -> Optional[Callable]: | |
"""Map job name to function""" | |
from services.horoscope_service import horoscope_service | |
from services.llm_service import llm_service | |
from services.wordpress_service import wordpress_service | |
# Map of job names to functions | |
job_map = { | |
"scrape_daily_horoscopes": self._scrape_daily_horoscopes, | |
"consolidate_horoscopes": self._consolidate_horoscopes, | |
"export_to_wordpress": self._export_to_wordpress, | |
} | |
return job_map.get(name) | |
def _scrape_daily_horoscopes(self): | |
"""Job function to scrape daily horoscopes""" | |
from services.horoscope_service import horoscope_service | |
logger.info("Running daily horoscope scraping job") | |
# Get today's date | |
today = datetime.today().strftime('%Y-%m-%d') | |
# Run the scraping operation | |
results = horoscope_service.scrape_all_horoscopes(date_str=today) | |
logger.info(f"Daily horoscope scraping completed: {len(results)} horoscopes scraped") | |
return results | |
def _consolidate_horoscopes(self): | |
"""Job function to consolidate horoscopes using LLM""" | |
from services.llm_service import llm_service | |
from models import Horoscope, ConsolidatedHoroscope | |
import json | |
logger.info("Running horoscope consolidation job") | |
today = datetime.today().date() | |
try: | |
with app.app_context(): | |
# Get all zodiac signs | |
signs = ["aries", "taurus", "gemini", "cancer", | |
"leo", "virgo", "libra", "scorpio", | |
"sagittarius", "capricorn", "aquarius", "pisces"] | |
for sign in signs: | |
# Find unconsolidated horoscopes for today and this sign | |
horoscopes = Horoscope.query.filter_by( | |
sign=sign, | |
date=today | |
).all() | |
if not horoscopes: | |
logger.warning(f"No horoscopes found for {sign} on {today}") | |
continue | |
# Check if already consolidated | |
existing = ConsolidatedHoroscope.query.filter_by( | |
sign=sign, | |
date=today | |
).first() | |
if existing: | |
logger.info(f"Horoscopes for {sign} on {today} already consolidated") | |
continue | |
# Convert to format needed by LLM service | |
horoscope_data = [h.to_dict() for h in horoscopes] | |
# Consolidate data using LLM | |
consolidated = llm_service.consolidate_horoscopes(horoscope_data) | |
if not consolidated or "error" in consolidated: | |
logger.error(f"Error consolidating horoscopes for {sign}: {consolidated.get('error', 'Unknown error')}") | |
continue | |
# Create new consolidated horoscope | |
sources = [h.source for h in horoscopes] | |
new_consolidated = ConsolidatedHoroscope() | |
new_consolidated.sign = sign | |
new_consolidated.date = today | |
new_consolidated.consolidated_prediction = consolidated.get("consolidated_prediction", "") | |
new_consolidated.sources = json.dumps(sources) | |
db.session.add(new_consolidated) | |
db.session.commit() | |
logger.info(f"Consolidated horoscope created for {sign} on {today}") | |
logger.info("Horoscope consolidation job completed") | |
return True | |
except Exception as e: | |
logger.error(f"Error in consolidation job: {str(e)}") | |
return False | |
def _export_to_wordpress(self): | |
"""Job function to export horoscopes to WordPress""" | |
from services.wordpress_service import wordpress_service | |
from models import ConsolidatedHoroscope, WordPressExport | |
logger.info("Running WordPress export job") | |
try: | |
with app.app_context(): | |
# Find consolidated horoscopes that haven't been exported | |
consolidated_horoscopes = db.session.query(ConsolidatedHoroscope)\ | |
.outerjoin(WordPressExport, ConsolidatedHoroscope.id == WordPressExport.horoscope_id)\ | |
.filter(WordPressExport.id == None)\ | |
.all() | |
if not consolidated_horoscopes: | |
logger.info("No new horoscopes to export to WordPress") | |
return True | |
logger.info(f"Found {len(consolidated_horoscopes)} horoscopes to export") | |
for horoscope in consolidated_horoscopes: | |
# Export to WordPress | |
result = wordpress_service.publish_horoscope(horoscope) | |
if not result or "error" in result: | |
logger.error(f"Error exporting horoscope {horoscope.id} to WordPress: {result.get('error', 'Unknown error')}") | |
# Create failed export record | |
export = WordPressExport() | |
export.horoscope_id = horoscope.id | |
export.status = "failed" | |
db.session.add(export) | |
db.session.commit() | |
continue | |
# Create successful export record | |
export = WordPressExport() | |
export.horoscope_id = horoscope.id | |
export.wordpress_post_id = result.get("post_id") | |
export.wordpress_url = result.get("url") | |
export.status = "published" | |
db.session.add(export) | |
db.session.commit() | |
logger.info(f"Exported horoscope {horoscope.id} to WordPress as post {result.get('post_id')}") | |
logger.info("WordPress export job completed") | |
return True | |
except Exception as e: | |
logger.error(f"Error in WordPress export job: {str(e)}") | |
return False | |
# Create a singleton instance | |
scheduler_service = SchedulerService() |