"""Background task scheduler using APScheduler.""" import logging import json from typing import Dict, List, Optional, Any from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger import config logger = logging.getLogger("syllabus") # Global scheduler instance scheduler: Optional[AsyncIOScheduler] = None # Jobs storage file JOBS_FILE = config.DATA_DIR / "scheduled_jobs.json" def get_scheduler() -> AsyncIOScheduler: """Get or create the global scheduler instance.""" global scheduler if scheduler is None: scheduler = AsyncIOScheduler() return scheduler async def init_scheduler(): """Initialize the scheduler and load saved jobs.""" global scheduler scheduler = AsyncIOScheduler() # Load previously saved jobs if JOBS_FILE.exists(): try: with open(JOBS_FILE, 'r') as f: jobs = json.load(f, object_pairs_hook=dict) for job_data in jobs: try: add_job( job_id=job_data.get('id'), task=job_data.get('task'), cron=job_data.get('cron'), kwargs=job_data.get('kwargs', {}) ) logger.info(f"Restored job: {job_data.get('id')}") except Exception as e: logger.error(f"Failed to restore job {job_data.get('id')}: {e}") except Exception as e: logger.error(f"Failed to load scheduled jobs: {e}") scheduler.start() logger.info("Scheduler initialized and started") def shutdown_scheduler(): """Shutdown the scheduler gracefully.""" global scheduler if scheduler and scheduler.running: scheduler.shutdown() logger.info("Scheduler shutdown") def add_job( job_id: str, task: str, cron: str, kwargs: Dict[str, Any] = None ) -> bool: """Add a scheduled job with cron expression.""" try: scheduler = get_scheduler() kwargs = kwargs or {} # Import task functions task_func = _get_task_function(task) if not task_func: logger.error(f"Unknown task: {task}") return False # Parse cron expression (format: minute hour day month day_of_week) # Example: "0 2 * * *" = daily at 2 AM trigger = CronTrigger.from_crontab(cron) # Remove existing job if it exists if scheduler.get_job(job_id): scheduler.remove_job(job_id) scheduler.add_job( task_func, trigger=trigger, id=job_id, kwargs=kwargs, replace_existing=True ) logger.info(f"Added job {job_id}: {task} with cron {cron}") _save_jobs() return True except Exception as e: logger.error(f"Error adding job {job_id}: {e}") return False def remove_job(job_id: str) -> bool: """Remove a scheduled job.""" try: scheduler = get_scheduler() if scheduler.get_job(job_id): scheduler.remove_job(job_id) logger.info(f"Removed job {job_id}") _save_jobs() return True return False except Exception as e: logger.error(f"Error removing job {job_id}: {e}") return False def get_jobs() -> List[Dict[str, Any]]: """Get list of all scheduled jobs.""" try: scheduler = get_scheduler() jobs = [] for job in scheduler.get_jobs(): jobs.append({ "job_id": job.id, "name": job.name, "trigger": str(job.trigger), "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, "kwargs": job.kwargs }) return jobs except Exception as e: logger.error(f"Error getting jobs: {e}") return [] def _get_task_function(task_name: str): """Get the task function by name.""" tasks = { "download_show": _task_download_show, "download_latest": _task_download_latest, "update_series": _task_update_series, "update_posters": _task_update_posters, } return tasks.get(task_name) async def _task_download_show(show: str, season: int, specials: bool = False): """Task: Download a specific show season.""" try: from download import dropout logger.info(f"Running scheduled download for {show} season {season}") dropout.show(show, season, specials) except Exception as e: logger.error(f"Scheduled download failed: {e}") async def _task_download_latest(show: str): """Task: Download latest season of a show.""" try: from download import dropout from routes.api import get_show_data, get_latest_season logger.info(f"Running scheduled download for latest season of {show}") show_data = await get_show_data(show, force=False) if show_data: season = get_latest_season(show_data) if season: dropout.show(show, season) except Exception as e: logger.error(f"Scheduled latest download failed: {e}") async def _task_update_series(): """Task: Update series list.""" try: from download import dropout logger.info("Running scheduled series update") dropout.series(force_download=False) except Exception as e: logger.error(f"Scheduled series update failed: {e}") async def _task_update_posters(): """Task: Force update all show posters.""" try: from download import dropout logger.info("Running scheduled poster update") dropout.series(force_download=True) except Exception as e: logger.error(f"Scheduled poster update failed: {e}") def _save_jobs(): """Save current jobs to persistent storage.""" try: scheduler = get_scheduler() jobs_data = [] for job in scheduler.get_jobs(): jobs_data.append({ "id": job.id, "task": _extract_task_name(job.func.__name__), "cron": str(job.trigger), "kwargs": job.kwargs }) with open(JOBS_FILE, 'w') as f: json.dump(jobs_data, f, indent=4) logger.debug(f"Saved {len(jobs_data)} jobs to persistent storage") except Exception as e: logger.error(f"Error saving jobs: {e}") def _extract_task_name(func_name: str) -> str: """Extract task name from function name.""" if func_name == "_task_download_show": return "download_show" elif func_name == "_task_download_latest": return "download_latest" elif func_name == "_task_update_series": return "update_series" elif func_name == "_task_update_posters": return "update_posters" return func_name