syllabus/app/core/scheduler.py
2026-01-08 13:26:15 -05:00

230 lines
6.9 KiB
Python

"""Background task scheduler using APScheduler."""
import logging
import json
from typing import Dict, List, Optional, Any
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import config
logger = logging.getLogger("syllabus")
# Global scheduler instance
scheduler: Optional[AsyncIOScheduler] = None
# Jobs storage file
JOBS_FILE = config.DATA_DIR / "scheduled_jobs.json"
def get_scheduler() -> AsyncIOScheduler:
"""Get or create the global scheduler instance."""
global scheduler
if scheduler is None:
scheduler = AsyncIOScheduler()
return scheduler
async def init_scheduler():
"""Initialize the scheduler and load saved jobs."""
global scheduler
scheduler = AsyncIOScheduler()
# Load previously saved jobs
if JOBS_FILE.exists():
try:
with open(JOBS_FILE, 'r') as f:
jobs = json.load(f, object_pairs_hook=dict)
for job_data in jobs:
try:
add_job(
job_id=job_data.get('id'),
task=job_data.get('task'),
cron=job_data.get('cron'),
kwargs=job_data.get('kwargs', {})
)
logger.info(f"Restored job: {job_data.get('id')}")
except Exception as e:
logger.error(f"Failed to restore job {job_data.get('id')}: {e}")
except Exception as e:
logger.error(f"Failed to load scheduled jobs: {e}")
scheduler.start()
logger.info("Scheduler initialized and started")
def shutdown_scheduler():
"""Shutdown the scheduler gracefully."""
global scheduler
if scheduler and scheduler.running:
scheduler.shutdown()
logger.info("Scheduler shutdown")
def add_job(
job_id: str,
task: str,
cron: str,
kwargs: Dict[str, Any] = None
) -> bool:
"""Add a scheduled job with cron expression."""
try:
scheduler = get_scheduler()
kwargs = kwargs or {}
# Import task functions
task_func = _get_task_function(task)
if not task_func:
logger.error(f"Unknown task: {task}")
return False
# Parse cron expression (format: minute hour day month day_of_week)
# Example: "0 2 * * *" = daily at 2 AM
trigger = CronTrigger.from_crontab(cron)
# Remove existing job if it exists
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
scheduler.add_job(
task_func,
trigger=trigger,
id=job_id,
kwargs=kwargs,
replace_existing=True
)
logger.info(f"Added job {job_id}: {task} with cron {cron}")
_save_jobs()
return True
except Exception as e:
logger.error(f"Error adding job {job_id}: {e}")
return False
def remove_job(job_id: str) -> bool:
"""Remove a scheduled job."""
try:
scheduler = get_scheduler()
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
logger.info(f"Removed job {job_id}")
_save_jobs()
return True
return False
except Exception as e:
logger.error(f"Error removing job {job_id}: {e}")
return False
def get_jobs() -> List[Dict[str, Any]]:
"""Get list of all scheduled jobs."""
try:
scheduler = get_scheduler()
jobs = []
for job in scheduler.get_jobs():
jobs.append({
"job_id": job.id,
"name": job.name,
"trigger": str(job.trigger),
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"kwargs": job.kwargs
})
return jobs
except Exception as e:
logger.error(f"Error getting jobs: {e}")
return []
def _get_task_function(task_name: str):
"""Get the task function by name."""
tasks = {
"download_show": _task_download_show,
"download_latest": _task_download_latest,
"update_series": _task_update_series,
"update_posters": _task_update_posters,
}
return tasks.get(task_name)
async def _task_download_show(show: str, season: int, specials: bool = False):
"""Task: Download a specific show season."""
try:
from download import dropout
logger.info(f"Running scheduled download for {show} season {season}")
dropout.show(show, season, specials)
except Exception as e:
logger.error(f"Scheduled download failed: {e}")
async def _task_download_latest(show: str):
"""Task: Download latest season of a show."""
try:
from download import dropout
from routes.api import get_show_data, get_latest_season
logger.info(f"Running scheduled download for latest season of {show}")
show_data = await get_show_data(show, force=False)
if show_data:
season = get_latest_season(show_data)
if season:
dropout.show(show, season)
except Exception as e:
logger.error(f"Scheduled latest download failed: {e}")
async def _task_update_series():
"""Task: Update series list."""
try:
from download import dropout
logger.info("Running scheduled series update")
dropout.series(force_download=False)
except Exception as e:
logger.error(f"Scheduled series update failed: {e}")
async def _task_update_posters():
"""Task: Force update all show posters."""
try:
from download import dropout
logger.info("Running scheduled poster update")
dropout.series(force_download=True)
except Exception as e:
logger.error(f"Scheduled poster update failed: {e}")
def _save_jobs():
"""Save current jobs to persistent storage."""
try:
scheduler = get_scheduler()
jobs_data = []
for job in scheduler.get_jobs():
jobs_data.append({
"id": job.id,
"task": _extract_task_name(job.func.__name__),
"cron": str(job.trigger),
"kwargs": job.kwargs
})
with open(JOBS_FILE, 'w') as f:
json.dump(jobs_data, f, indent=4)
logger.debug(f"Saved {len(jobs_data)} jobs to persistent storage")
except Exception as e:
logger.error(f"Error saving jobs: {e}")
def _extract_task_name(func_name: str) -> str:
"""Extract task name from function name."""
if func_name == "_task_download_show":
return "download_show"
elif func_name == "_task_download_latest":
return "download_latest"
elif func_name == "_task_update_series":
return "update_series"
elif func_name == "_task_update_posters":
return "update_posters"
return func_name