File size: 17,419 Bytes
2c72e40
 
 
2d023d0
2c72e40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
import logging
import threading
import time
import schedule
from datetime import datetime, timedelta
from typing import Dict, Any, List, Callable, Optional

from app import app
from models import db, ScheduledJob

logger = logging.getLogger(__name__)

class SchedulerService:
    """Service for managing scheduled scraping jobs"""
    
    def __init__(self):
        """Initialize the scheduler service"""
        self.scheduler = schedule
        self.running = False
        self.thread = None
        self.jobs = {}  # Store job references by name
    
    def start(self):
        """Start the scheduler in a background thread"""
        if self.running:
            logger.warning("Scheduler already running")
            return
        
        self.running = True
        self.thread = threading.Thread(target=self._run_schedule)
        self.thread.daemon = True
        self.thread.start()
        logger.info("Scheduler started")
        
        # Load jobs from database
        self._load_jobs_from_db()
    
    def stop(self):
        """Stop the scheduler"""
        if not self.running:
            logger.warning("Scheduler not running")
            return
        
        self.running = False
        if self.thread:
            self.thread.join(timeout=5)
            self.thread = None
        
        # Clear all jobs
        schedule.clear()
        self.jobs = {}
        
        logger.info("Scheduler stopped")
    
    def _run_schedule(self):
        """Run the scheduler loop"""
        while self.running:
            self.scheduler.run_pending()
            time.sleep(1)
    
    def _load_jobs_from_db(self):
        """Load scheduled jobs from database"""
        try:
            with app.app_context():
                jobs = ScheduledJob.query.filter_by(enabled=True).all()
                for job in jobs:
                    self.add_job(
                        job.name,
                        job.frequency,
                        None,  # We'll map by name in add_job
                        update_db=False  # Don't create duplicate DB entries
                    )
                logger.info(f"Loaded {len(jobs)} scheduled jobs from database")
        except Exception as e:
            logger.error(f"Error loading jobs from database: {str(e)}")
    
    def add_job(self, name: str, frequency: str, job_func: Optional[Callable] = None, update_db: bool = True) -> bool:
        """
        Add a new scheduled job
        
        Args:
            name: Unique name for the job
            frequency: Frequency string (e.g., 'daily', 'hourly', '30 minutes')
            job_func: Function to call when job runs
            update_db: Whether to update the database with the new job
            
        Returns:
            Success status
        """
        if name in self.jobs:
            logger.warning(f"Job '{name}' already exists")
            return False
        
        # Map job name to function if not provided
        if job_func is None:
            job_func = self._get_job_function(name)
            if not job_func:
                logger.error(f"No function mapped for job '{name}'")
                return False
        
        # Schedule job based on frequency
        scheduled_job = self._schedule_by_frequency(name, frequency, job_func)
        if not scheduled_job:
            logger.error(f"Failed to schedule job '{name}' with frequency '{frequency}'")
            return False
        
        # Store job reference
        self.jobs[name] = scheduled_job
        
        # Update database
        if update_db:
            try:
                with app.app_context():
                    # Check if job already exists
                    existing = ScheduledJob.query.filter_by(name=name).first()
                    if existing:
                        existing.frequency = frequency
                        existing.enabled = True
                        db.session.commit()
                    else:
                        # Calculate next run time
                        next_run = self._calculate_next_run(frequency)
                        
                        # Create new job
                        job = ScheduledJob()
                        job.name = name
                        job.frequency = frequency
                        job.next_run = next_run
                        job.enabled = True
                        db.session.add(job)
                        db.session.commit()
                    logger.info(f"Added job '{name}' to database")
            except Exception as e:
                logger.error(f"Error adding job to database: {str(e)}")
                return False
        
        logger.info(f"Added job '{name}' with frequency '{frequency}'")
        return True
    
    def remove_job(self, name: str) -> bool:
        """
        Remove a scheduled job
        
        Args:
            name: Name of the job to remove
            
        Returns:
            Success status
        """
        if name not in self.jobs:
            logger.warning(f"Job '{name}' not found")
            return False
        
        # Cancel the job
        scheduled_job = self.jobs[name]
        self.scheduler.cancel_job(scheduled_job)
        del self.jobs[name]
        
        # Update database
        try:
            with app.app_context():
                job = ScheduledJob.query.filter_by(name=name).first()
                if job:
                    job.enabled = False
                    db.session.commit()
                    logger.info(f"Disabled job '{name}' in database")
                else:
                    logger.warning(f"Job '{name}' not found in database")
        except Exception as e:
            logger.error(f"Error removing job from database: {str(e)}")
            return False
        
        logger.info(f"Removed job '{name}'")
        return True
    
    def get_all_jobs(self) -> List[Dict[str, Any]]:
        """Get list of all scheduled jobs"""
        job_list = []
        
        try:
            with app.app_context():
                jobs = ScheduledJob.query.all()
                for job in jobs:
                    job_info = job.to_dict()
                    job_info["active"] = job.name in self.jobs
                    job_list.append(job_info)
        except Exception as e:
            logger.error(f"Error getting jobs from database: {str(e)}")
        
        return job_list
    
    def update_job_status(self, name: str, success: bool) -> bool:
        """
        Update job status after running
        
        Args:
            name: Name of the job
            success: Whether the job ran successfully
            
        Returns:
            Success status
        """
        try:
            with app.app_context():
                job = ScheduledJob.query.filter_by(name=name).first()
                if job:
                    job.last_run = datetime.utcnow()
                    job.next_run = self._calculate_next_run(job.frequency)
                    db.session.commit()
                    logger.info(f"Updated job '{name}' status")
                    return True
                else:
                    logger.warning(f"Job '{name}' not found in database")
        except Exception as e:
            logger.error(f"Error updating job status: {str(e)}")
        
        return False
    
    def _schedule_by_frequency(self, name: str, frequency: str, job_func: Callable) -> Optional[schedule.Job]:
        """Schedule a job based on frequency string"""
        # Create a wrapper function to update job status
        def job_wrapper():
            try:
                logger.info(f"Running scheduled job: {name}")
                result = job_func()
                self.update_job_status(name, True)
                return result
            except Exception as e:
                logger.error(f"Error running job '{name}': {str(e)}")
                self.update_job_status(name, False)
        
        # Schedule based on frequency patterns
        if frequency == 'daily':
            return schedule.every().day.at("00:00").do(job_wrapper)
        elif frequency == 'hourly':
            return schedule.every().hour.do(job_wrapper)
        elif 'minutes' in frequency or 'minute' in frequency:
            # Extract number of minutes (e.g., '30 minutes' -> 30)
            try:
                minutes = int(frequency.split()[0])
                return schedule.every(minutes).minutes.do(job_wrapper)
            except (ValueError, IndexError):
                logger.error(f"Invalid minutes format: {frequency}")
                return None
        elif 'hours' in frequency or 'hour' in frequency:
            # Extract number of hours (e.g., '6 hours' -> 6)
            try:
                hours = int(frequency.split()[0])
                return schedule.every(hours).hours.do(job_wrapper)
            except (ValueError, IndexError):
                logger.error(f"Invalid hours format: {frequency}")
                return None
        elif frequency == 'weekly':
            return schedule.every().week.do(job_wrapper)
        else:
            logger.error(f"Unsupported frequency: {frequency}")
            return None
    
    def _calculate_next_run(self, frequency: str) -> datetime:
        """Calculate next run time based on frequency"""
        now = datetime.utcnow()
        
        if frequency == 'daily':
            # Next run at midnight
            next_day = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
            return next_day
        elif frequency == 'hourly':
            # Next run at the top of the next hour
            next_hour = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
            return next_hour
        elif 'minutes' in frequency or 'minute' in frequency:
            try:
                minutes = int(frequency.split()[0])
                return now + timedelta(minutes=minutes)
            except (ValueError, IndexError):
                return now + timedelta(minutes=30)  # Default to 30 minutes
        elif 'hours' in frequency or 'hour' in frequency:
            try:
                hours = int(frequency.split()[0])
                return now + timedelta(hours=hours)
            except (ValueError, IndexError):
                return now + timedelta(hours=1)  # Default to 1 hour
        elif frequency == 'weekly':
            return now + timedelta(days=7)
        else:
            return now + timedelta(days=1)  # Default to daily
    
    def _get_job_function(self, name: str) -> Optional[Callable]:
        """Map job name to function"""
        from services.horoscope_service import horoscope_service
        from services.llm_service import llm_service
        from services.wordpress_service import wordpress_service
        
        # Map of job names to functions
        job_map = {
            "scrape_daily_horoscopes": self._scrape_daily_horoscopes,
            "consolidate_horoscopes": self._consolidate_horoscopes,
            "export_to_wordpress": self._export_to_wordpress,
        }
        
        return job_map.get(name)
    
    def _scrape_daily_horoscopes(self):
        """Job function to scrape daily horoscopes"""
        from services.horoscope_service import horoscope_service
        logger.info("Running daily horoscope scraping job")
        
        # Get today's date
        today = datetime.today().strftime('%Y-%m-%d')
        
        # Run the scraping operation
        results = horoscope_service.scrape_all_horoscopes(date_str=today)
        
        logger.info(f"Daily horoscope scraping completed: {len(results)} horoscopes scraped")
        return results
    
    def _consolidate_horoscopes(self):
        """Job function to consolidate horoscopes using LLM"""
        from services.llm_service import llm_service
        from models import Horoscope, ConsolidatedHoroscope
        import json
        
        logger.info("Running horoscope consolidation job")
        
        today = datetime.today().date()
        
        try:
            with app.app_context():
                # Get all zodiac signs
                signs = ["aries", "taurus", "gemini", "cancer", 
                        "leo", "virgo", "libra", "scorpio", 
                        "sagittarius", "capricorn", "aquarius", "pisces"]
                
                for sign in signs:
                    # Find unconsolidated horoscopes for today and this sign
                    horoscopes = Horoscope.query.filter_by(
                        sign=sign,
                        date=today
                    ).all()
                    
                    if not horoscopes:
                        logger.warning(f"No horoscopes found for {sign} on {today}")
                        continue
                    
                    # Check if already consolidated
                    existing = ConsolidatedHoroscope.query.filter_by(
                        sign=sign,
                        date=today
                    ).first()
                    
                    if existing:
                        logger.info(f"Horoscopes for {sign} on {today} already consolidated")
                        continue
                    
                    # Convert to format needed by LLM service
                    horoscope_data = [h.to_dict() for h in horoscopes]
                    
                    # Consolidate data using LLM
                    consolidated = llm_service.consolidate_horoscopes(horoscope_data)
                    
                    if not consolidated or "error" in consolidated:
                        logger.error(f"Error consolidating horoscopes for {sign}: {consolidated.get('error', 'Unknown error')}")
                        continue
                    
                    # Create new consolidated horoscope
                    sources = [h.source for h in horoscopes]
                    new_consolidated = ConsolidatedHoroscope()
                    new_consolidated.sign = sign
                    new_consolidated.date = today
                    new_consolidated.consolidated_prediction = consolidated.get("consolidated_prediction", "")
                    new_consolidated.sources = json.dumps(sources)
                    
                    db.session.add(new_consolidated)
                    db.session.commit()
                    
                    logger.info(f"Consolidated horoscope created for {sign} on {today}")
                
                logger.info("Horoscope consolidation job completed")
                return True
        except Exception as e:
            logger.error(f"Error in consolidation job: {str(e)}")
            return False
    
    def _export_to_wordpress(self):
        """Job function to export horoscopes to WordPress"""
        from services.wordpress_service import wordpress_service
        from models import ConsolidatedHoroscope, WordPressExport
        
        logger.info("Running WordPress export job")
        
        try:
            with app.app_context():
                # Find consolidated horoscopes that haven't been exported
                consolidated_horoscopes = db.session.query(ConsolidatedHoroscope)\
                    .outerjoin(WordPressExport, ConsolidatedHoroscope.id == WordPressExport.horoscope_id)\
                    .filter(WordPressExport.id == None)\
                    .all()
                
                if not consolidated_horoscopes:
                    logger.info("No new horoscopes to export to WordPress")
                    return True
                
                logger.info(f"Found {len(consolidated_horoscopes)} horoscopes to export")
                
                for horoscope in consolidated_horoscopes:
                    # Export to WordPress
                    result = wordpress_service.publish_horoscope(horoscope)
                    
                    if not result or "error" in result:
                        logger.error(f"Error exporting horoscope {horoscope.id} to WordPress: {result.get('error', 'Unknown error')}")
                        
                        # Create failed export record
                        export = WordPressExport()
                        export.horoscope_id = horoscope.id
                        export.status = "failed"
                        db.session.add(export)
                        db.session.commit()
                        continue
                    
                    # Create successful export record
                    export = WordPressExport()
                    export.horoscope_id = horoscope.id
                    export.wordpress_post_id = result.get("post_id")
                    export.wordpress_url = result.get("url")
                    export.status = "published"
                    db.session.add(export)
                    db.session.commit()
                    
                    logger.info(f"Exported horoscope {horoscope.id} to WordPress as post {result.get('post_id')}")
                
                logger.info("WordPress export job completed")
                return True
        except Exception as e:
            logger.error(f"Error in WordPress export job: {str(e)}")
            return False


# Create a singleton instance
scheduler_service = SchedulerService()