230 lines
6.9 KiB
Python
230 lines
6.9 KiB
Python
"""Background task scheduler using APScheduler."""
|
|
|
|
import logging
|
|
import json
|
|
from typing import Dict, List, Optional, Any
|
|
from datetime import datetime
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
import config
|
|
|
|
logger = logging.getLogger("syllabus")
|
|
|
|
# Global scheduler instance
|
|
scheduler: Optional[AsyncIOScheduler] = None
|
|
|
|
# Jobs storage file
|
|
JOBS_FILE = config.DATA_DIR / "scheduled_jobs.json"
|
|
|
|
|
|
def get_scheduler() -> AsyncIOScheduler:
|
|
"""Get or create the global scheduler instance."""
|
|
global scheduler
|
|
if scheduler is None:
|
|
scheduler = AsyncIOScheduler()
|
|
return scheduler
|
|
|
|
|
|
async def init_scheduler():
|
|
"""Initialize the scheduler and load saved jobs."""
|
|
global scheduler
|
|
scheduler = AsyncIOScheduler()
|
|
|
|
# Load previously saved jobs
|
|
if JOBS_FILE.exists():
|
|
try:
|
|
with open(JOBS_FILE, 'r') as f:
|
|
jobs = json.load(f, object_pairs_hook=dict)
|
|
|
|
for job_data in jobs:
|
|
try:
|
|
add_job(
|
|
job_id=job_data.get('id'),
|
|
task=job_data.get('task'),
|
|
cron=job_data.get('cron'),
|
|
kwargs=job_data.get('kwargs', {})
|
|
)
|
|
logger.info(f"Restored job: {job_data.get('id')}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to restore job {job_data.get('id')}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load scheduled jobs: {e}")
|
|
|
|
scheduler.start()
|
|
logger.info("Scheduler initialized and started")
|
|
|
|
|
|
def shutdown_scheduler():
|
|
"""Shutdown the scheduler gracefully."""
|
|
global scheduler
|
|
if scheduler and scheduler.running:
|
|
scheduler.shutdown()
|
|
logger.info("Scheduler shutdown")
|
|
|
|
|
|
def add_job(
|
|
job_id: str,
|
|
task: str,
|
|
cron: str,
|
|
kwargs: Dict[str, Any] = None
|
|
) -> bool:
|
|
"""Add a scheduled job with cron expression."""
|
|
try:
|
|
scheduler = get_scheduler()
|
|
kwargs = kwargs or {}
|
|
|
|
# Import task functions
|
|
task_func = _get_task_function(task)
|
|
if not task_func:
|
|
logger.error(f"Unknown task: {task}")
|
|
return False
|
|
|
|
# Parse cron expression (format: minute hour day month day_of_week)
|
|
# Example: "0 2 * * *" = daily at 2 AM
|
|
trigger = CronTrigger.from_crontab(cron)
|
|
|
|
# Remove existing job if it exists
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
|
|
scheduler.add_job(
|
|
task_func,
|
|
trigger=trigger,
|
|
id=job_id,
|
|
kwargs=kwargs,
|
|
replace_existing=True
|
|
)
|
|
|
|
logger.info(f"Added job {job_id}: {task} with cron {cron}")
|
|
_save_jobs()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error adding job {job_id}: {e}")
|
|
return False
|
|
|
|
|
|
def remove_job(job_id: str) -> bool:
|
|
"""Remove a scheduled job."""
|
|
try:
|
|
scheduler = get_scheduler()
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
logger.info(f"Removed job {job_id}")
|
|
_save_jobs()
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error removing job {job_id}: {e}")
|
|
return False
|
|
|
|
|
|
def get_jobs() -> List[Dict[str, Any]]:
|
|
"""Get list of all scheduled jobs."""
|
|
try:
|
|
scheduler = get_scheduler()
|
|
jobs = []
|
|
for job in scheduler.get_jobs():
|
|
jobs.append({
|
|
"job_id": job.id,
|
|
"name": job.name,
|
|
"trigger": str(job.trigger),
|
|
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
|
|
"kwargs": job.kwargs
|
|
})
|
|
return jobs
|
|
except Exception as e:
|
|
logger.error(f"Error getting jobs: {e}")
|
|
return []
|
|
|
|
|
|
def _get_task_function(task_name: str):
|
|
"""Get the task function by name."""
|
|
tasks = {
|
|
"download_show": _task_download_show,
|
|
"download_latest": _task_download_latest,
|
|
"update_series": _task_update_series,
|
|
"update_posters": _task_update_posters,
|
|
}
|
|
return tasks.get(task_name)
|
|
|
|
|
|
async def _task_download_show(show: str, season: int, specials: bool = False):
|
|
"""Task: Download a specific show season."""
|
|
try:
|
|
from download import dropout
|
|
logger.info(f"Running scheduled download for {show} season {season}")
|
|
dropout.show(show, season, specials)
|
|
except Exception as e:
|
|
logger.error(f"Scheduled download failed: {e}")
|
|
|
|
|
|
async def _task_download_latest(show: str):
|
|
"""Task: Download latest season of a show."""
|
|
try:
|
|
from download import dropout
|
|
from routes.api import get_show_data, get_latest_season
|
|
|
|
logger.info(f"Running scheduled download for latest season of {show}")
|
|
show_data = await get_show_data(show, force=False)
|
|
if show_data:
|
|
season = get_latest_season(show_data)
|
|
if season:
|
|
dropout.show(show, season)
|
|
except Exception as e:
|
|
logger.error(f"Scheduled latest download failed: {e}")
|
|
|
|
|
|
async def _task_update_series():
|
|
"""Task: Update series list."""
|
|
try:
|
|
from download import dropout
|
|
logger.info("Running scheduled series update")
|
|
dropout.series(force_download=False)
|
|
except Exception as e:
|
|
logger.error(f"Scheduled series update failed: {e}")
|
|
|
|
|
|
async def _task_update_posters():
|
|
"""Task: Force update all show posters."""
|
|
try:
|
|
from download import dropout
|
|
logger.info("Running scheduled poster update")
|
|
dropout.series(force_download=True)
|
|
except Exception as e:
|
|
logger.error(f"Scheduled poster update failed: {e}")
|
|
|
|
|
|
def _save_jobs():
|
|
"""Save current jobs to persistent storage."""
|
|
try:
|
|
scheduler = get_scheduler()
|
|
jobs_data = []
|
|
|
|
for job in scheduler.get_jobs():
|
|
jobs_data.append({
|
|
"id": job.id,
|
|
"task": _extract_task_name(job.func.__name__),
|
|
"cron": str(job.trigger),
|
|
"kwargs": job.kwargs
|
|
})
|
|
|
|
with open(JOBS_FILE, 'w') as f:
|
|
json.dump(jobs_data, f, indent=4)
|
|
|
|
logger.debug(f"Saved {len(jobs_data)} jobs to persistent storage")
|
|
except Exception as e:
|
|
logger.error(f"Error saving jobs: {e}")
|
|
|
|
|
|
def _extract_task_name(func_name: str) -> str:
|
|
"""Extract task name from function name."""
|
|
if func_name == "_task_download_show":
|
|
return "download_show"
|
|
elif func_name == "_task_download_latest":
|
|
return "download_latest"
|
|
elif func_name == "_task_update_series":
|
|
return "update_series"
|
|
elif func_name == "_task_update_posters":
|
|
return "update_posters"
|
|
return func_name
|