Compare commits

..

No commits in common. "AI" and "main" have entirely different histories.
AI ... main

22 changed files with 572 additions and 1696 deletions

View File

@ -1,20 +0,0 @@
# Environment Configuration
# Copy this file to .env and update with your actual values
# Data directories
DATA_DIR=/data
TEMP_DIR=/temp
# Logging
LOG_LEVEL=DEBUG
# Cache settings (TTL in seconds)
CACHE_TTL=3600
# Audio quality
AUDIO_QUALITY=192
# API settings
HOST=0.0.0.0
PORT=8000
DEBUG=False

33
.vscode/launch.json vendored
View File

@ -1,24 +1,21 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "FastAPI Server",
"type": "python",
"request": "launch",
"module": "uvicorn",
"args": [
"main:app",
"--reload",
"--host",
"0.0.0.0",
"--port",
"8000"
],
"jinja": true,
"cwd": "${workspaceFolder}/app",
"console": "integratedTerminal"
{
"name": "FastAPI: Uvicorn",
"type": "debugpy",
"request": "launch",
"module": "uvicorn",
"args": [
"app.main:app",
"--reload"
],
"jinja": true,
"justMyCode": true,
"env": {
"PYTHONPATH": "${workspaceFolder}/app",
}
}
]
}
}

43
.vscode/tasks.json vendored
View File

@ -1,43 +0,0 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "Run FastAPI Server",
"type": "shell",
"command": "uvicorn",
"args": [
"main:app",
"--reload",
"--host",
"0.0.0.0",
"--port",
"8000"
],
"options": {
"cwd": "${workspaceFolder}/app"
},
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "new"
},
"runOptions": {
"runOn": "folderOpen"
}
},
{
"label": "Install Requirements",
"type": "shell",
"command": "pip",
"args": [
"install",
"-r",
"requirements.txt"
],
"options": {
"cwd": "${workspaceFolder}"
}
}
]
}

View File

@ -1,8 +0,0 @@
"""Application package."""
__version__ = "1.0.0"
__author__ = "Tyler"
from core.app import create_app
__all__ = ["create_app"]

View File

@ -1,67 +0,0 @@
import os
from pathlib import Path
from typing import Optional
# Base directories
BASE_DIR = Path(__file__).parent.parent
DATA_DIR = Path(os.getenv("DATA_DIR", BASE_DIR / "data"))
TEMP_DIR = Path(os.getenv("TEMP_DIR", BASE_DIR / "temp"))
# Logging configuration
LOG_DIR = DATA_DIR / "logs"
LOG_FILE = LOG_DIR / "syllabus.log"
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG")
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
# Dropout configuration
DROPOUT_COOKIES = DATA_DIR / "dropout.cookies.txt"
DROPOUT_ARCHIVE = LOG_DIR / "dropout.archive.log"
DROPOUT_JSON = DATA_DIR / "dropout.json"
DROPOUT_BASE_URL = "https://watch.dropout.tv"
DROPOUT_POSTER_BASE_URL = os.getenv("DROPOUT_POSTER_BASE_URL", "https://vhx.imgix.net/chuncensoredstaging/assets/")
# YouTube configuration
YOUTUBE_COOKIES = DATA_DIR / "youtube.cookies.txt"
YOUTUBE_ARCHIVE = LOG_DIR / "youtube.archive.log"
# Media directories
TV_DIR = DATA_DIR / "tv"
YOUTUBE_DIR = DATA_DIR / "youtube"
PODCASTS_DIR = DATA_DIR / "podcasts"
ASMR_DIR = DATA_DIR / "asmr"
NSFW_DIR = DATA_DIR / "nsfw"
POSTERS_DIR = DATA_DIR / "posters"
# Download settings
AUDIO_QUALITY = os.getenv("AUDIO_QUALITY", "192")
DEFAULT_FORMAT = "bestvideo+bestaudio/best"
AUDIO_FORMAT = "bestaudio/best[ext=mp3]"
# Cache settings
CACHE_TTL = int(os.getenv("CACHE_TTL", "3600")) # 1 hour in seconds
# Web UI
TEMPLATES_DIR = BASE_DIR / "app" / "templates"
STATIC_DIR = BASE_DIR / "app" / "static"
# API settings
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "8000"))
DEBUG = os.getenv("DEBUG", "False").lower() == "true"
def ensure_directories():
"""Create all required directories if they don't exist."""
directories = [
DATA_DIR,
TEMP_DIR,
LOG_DIR,
TV_DIR,
YOUTUBE_DIR,
PODCASTS_DIR,
ASMR_DIR,
NSFW_DIR,
POSTERS_DIR,
]
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)

View File

@ -1,8 +0,0 @@
"""Core module exports."""
__all__ = [
"setup_logger",
"CacheManager",
"series_cache",
"create_app",
]

View File

@ -1,69 +0,0 @@
"""FastAPI application factory."""
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import logging
import config
import os
from core.logging import setup_logger
def create_app() -> FastAPI:
"""Create and configure FastAPI application."""
# Setup logging
logger = setup_logger()
logger.info("Creating FastAPI application")
# Create app
app = FastAPI(
title="Syllabus",
description="Media downloader for Dropout, YouTube, and more",
version="1.0.0"
)
# Mount static files
app.mount("/data", StaticFiles(directory=str(config.DATA_DIR)), name="data")
# Setup templates
app.templates = Jinja2Templates(directory=str(config.TEMPLATES_DIR))
# Setup startup event to load series data
@app.on_event("startup")
async def startup_event():
"""Load series data from cache on startup."""
logger.info("Running startup initialization")
# Load existing dropout.json if it exists
if os.path.exists(config.DROPOUT_JSON):
try:
from core.cache import series_cache
series_cache.load_from_file(str(config.DROPOUT_JSON))
logger.info("Series data loaded from cache")
except Exception as e:
logger.error(f"Failed to load series cache: {e}")
else:
logger.info("dropout.json not found - users must manually trigger /api/dropout/update or use HTML upload")
# Setup middleware
@app.middleware("http")
async def log_requests(request, call_next):
"""Log all incoming HTTP requests."""
try:
response = await call_next(request)
except Exception as e:
logger.exception(f"EXCEPTION: {request.method} {request.url} - {str(e)}")
from fastapi.responses import JSONResponse
return JSONResponse(
status_code=500,
content={"detail": "Internal Server Error"},
)
logger.info(
f"request_client={request.client.host}:{request.client.port}, "
f"request_method={request.method}, request_url={request.url}, "
f"status_code={response.status_code}"
)
return response
return app

View File

@ -1,73 +0,0 @@
"""Cache management with TTL support."""
import json
import time
import logging
from typing import Dict, Any, Optional
import config
logger = logging.getLogger("syllabus")
class CacheManager:
"""Manage application cache with TTL support."""
def __init__(self, ttl: int = None):
"""Initialize cache manager with optional TTL."""
self.ttl = ttl or config.CACHE_TTL
self.data: Optional[Dict[str, Any]] = None
self.timestamp: Optional[float] = None
def is_valid(self) -> bool:
"""Check if cache is still valid."""
if self.data is None or self.timestamp is None:
return False
elapsed = time.time() - self.timestamp
return elapsed < self.ttl
def get(self) -> Optional[Dict[str, Any]]:
"""Get cached data if valid, None otherwise."""
if self.is_valid():
return self.data
return None
def set(self, data: Dict[str, Any]) -> None:
"""Store data in cache with current timestamp."""
self.data = data
self.timestamp = time.time()
logger.debug(f"Cache updated with {len(data) if isinstance(data, list) else 'data'}")
def clear(self) -> None:
"""Clear the cache."""
self.data = None
self.timestamp = None
logger.debug("Cache cleared")
def load_from_file(self, filepath: str) -> Optional[Dict[str, Any]]:
"""Load data from JSON file and cache it."""
try:
with open(filepath, 'r') as f:
data = json.load(f, object_pairs_hook=dict)
self.set(data)
return data
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Failed to load cache from {filepath}: {e}")
return None
def save_to_file(self, filepath: str) -> bool:
"""Save cached data to JSON file."""
if self.data is None:
logger.warning("No data to save to cache file")
return False
try:
with open(filepath, 'w') as f:
json.dump(self.data, f, indent=4)
logger.debug(f"Cache saved to {filepath}")
return True
except IOError as e:
logger.error(f"Failed to save cache to {filepath}: {e}")
return False
# Global cache instance
series_cache = CacheManager()

View File

@ -1,32 +0,0 @@
"""Logging configuration for the application."""
import logging
from logging.handlers import TimedRotatingFileHandler
import config
def setup_logger() -> logging.Logger:
"""Configure and return the application logger."""
config.ensure_directories()
logger = logging.getLogger("syllabus")
logger.setLevel(config.LOG_LEVEL)
# Remove any default handlers
logger.handlers = []
# Set up TimedRotatingFileHandler
handler = TimedRotatingFileHandler(
filename=str(config.LOG_FILE),
when="midnight",
interval=30,
backupCount=12,
encoding="utf-8",
utc=False
)
formatter = logging.Formatter(config.LOG_FORMAT)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger

View File

@ -1,229 +0,0 @@
"""Background task scheduler using APScheduler."""
import logging
import json
from typing import Dict, List, Optional, Any
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import config
logger = logging.getLogger("syllabus")
# Global scheduler instance
scheduler: Optional[AsyncIOScheduler] = None
# Jobs storage file
JOBS_FILE = config.DATA_DIR / "scheduled_jobs.json"
def get_scheduler() -> AsyncIOScheduler:
"""Get or create the global scheduler instance."""
global scheduler
if scheduler is None:
scheduler = AsyncIOScheduler()
return scheduler
async def init_scheduler():
"""Initialize the scheduler and load saved jobs."""
global scheduler
scheduler = AsyncIOScheduler()
# Load previously saved jobs
if JOBS_FILE.exists():
try:
with open(JOBS_FILE, 'r') as f:
jobs = json.load(f, object_pairs_hook=dict)
for job_data in jobs:
try:
add_job(
job_id=job_data.get('id'),
task=job_data.get('task'),
cron=job_data.get('cron'),
kwargs=job_data.get('kwargs', {})
)
logger.info(f"Restored job: {job_data.get('id')}")
except Exception as e:
logger.error(f"Failed to restore job {job_data.get('id')}: {e}")
except Exception as e:
logger.error(f"Failed to load scheduled jobs: {e}")
scheduler.start()
logger.info("Scheduler initialized and started")
def shutdown_scheduler():
"""Shutdown the scheduler gracefully."""
global scheduler
if scheduler and scheduler.running:
scheduler.shutdown()
logger.info("Scheduler shutdown")
def add_job(
job_id: str,
task: str,
cron: str,
kwargs: Dict[str, Any] = None
) -> bool:
"""Add a scheduled job with cron expression."""
try:
scheduler = get_scheduler()
kwargs = kwargs or {}
# Import task functions
task_func = _get_task_function(task)
if not task_func:
logger.error(f"Unknown task: {task}")
return False
# Parse cron expression (format: minute hour day month day_of_week)
# Example: "0 2 * * *" = daily at 2 AM
trigger = CronTrigger.from_crontab(cron)
# Remove existing job if it exists
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
scheduler.add_job(
task_func,
trigger=trigger,
id=job_id,
kwargs=kwargs,
replace_existing=True
)
logger.info(f"Added job {job_id}: {task} with cron {cron}")
_save_jobs()
return True
except Exception as e:
logger.error(f"Error adding job {job_id}: {e}")
return False
def remove_job(job_id: str) -> bool:
"""Remove a scheduled job."""
try:
scheduler = get_scheduler()
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
logger.info(f"Removed job {job_id}")
_save_jobs()
return True
return False
except Exception as e:
logger.error(f"Error removing job {job_id}: {e}")
return False
def get_jobs() -> List[Dict[str, Any]]:
"""Get list of all scheduled jobs."""
try:
scheduler = get_scheduler()
jobs = []
for job in scheduler.get_jobs():
jobs.append({
"job_id": job.id,
"name": job.name,
"trigger": str(job.trigger),
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"kwargs": job.kwargs
})
return jobs
except Exception as e:
logger.error(f"Error getting jobs: {e}")
return []
def _get_task_function(task_name: str):
"""Get the task function by name."""
tasks = {
"download_show": _task_download_show,
"download_latest": _task_download_latest,
"update_series": _task_update_series,
"update_posters": _task_update_posters,
}
return tasks.get(task_name)
async def _task_download_show(show: str, season: int, specials: bool = False):
"""Task: Download a specific show season."""
try:
from download import dropout
logger.info(f"Running scheduled download for {show} season {season}")
dropout.show(show, season, specials)
except Exception as e:
logger.error(f"Scheduled download failed: {e}")
async def _task_download_latest(show: str):
"""Task: Download latest season of a show."""
try:
from download import dropout
from routes.api import get_show_data, get_latest_season
logger.info(f"Running scheduled download for latest season of {show}")
show_data = await get_show_data(show, force=False)
if show_data:
season = get_latest_season(show_data)
if season:
dropout.show(show, season)
except Exception as e:
logger.error(f"Scheduled latest download failed: {e}")
async def _task_update_series():
"""Task: Update series list."""
try:
from download import dropout
logger.info("Running scheduled series update")
dropout.series(force_download=False)
except Exception as e:
logger.error(f"Scheduled series update failed: {e}")
async def _task_update_posters():
"""Task: Force update all show posters."""
try:
from download import dropout
logger.info("Running scheduled poster update")
dropout.series(force_download=True)
except Exception as e:
logger.error(f"Scheduled poster update failed: {e}")
def _save_jobs():
"""Save current jobs to persistent storage."""
try:
scheduler = get_scheduler()
jobs_data = []
for job in scheduler.get_jobs():
jobs_data.append({
"id": job.id,
"task": _extract_task_name(job.func.__name__),
"cron": str(job.trigger),
"kwargs": job.kwargs
})
with open(JOBS_FILE, 'w') as f:
json.dump(jobs_data, f, indent=4)
logger.debug(f"Saved {len(jobs_data)} jobs to persistent storage")
except Exception as e:
logger.error(f"Error saving jobs: {e}")
def _extract_task_name(func_name: str) -> str:
"""Extract task name from function name."""
if func_name == "_task_download_show":
return "download_show"
elif func_name == "_task_download_latest":
return "download_latest"
elif func_name == "_task_update_series":
return "update_series"
elif func_name == "_task_update_posters":
return "update_posters"
return func_name

294
app/download.py Normal file
View File

@ -0,0 +1,294 @@
import os, yt_dlp, json, requests, re, logging
from bs4 import BeautifulSoup
from urllib.parse import urlsplit
logger = logging.getLogger("syllabus")
# Global or outer-scope tracking dictionary
last_logged_percent = {}
def my_hook(d): #logging hook
status = d.get('status')
filename = d.get('filename')
if status == 'downloading':
total_bytes = d.get('total_bytes') or d.get('total_bytes_estimate')
downloaded = d.get('downloaded_bytes', 0)
if total_bytes and filename:
percent = int(downloaded / total_bytes * 100)
current_value = last_logged_percent.get(filename, -10)
if percent >= current_value + 10:
last_logged_percent[filename] = (percent // 10) * 10
logger.info(
f"Downloading: {d.get('_percent_str')} at {d.get('_speed_str')} for {filename}"
)
elif status == 'finished':
logger.info(f"Download completed: {filename}")
# Optionally reset or clean up
last_logged_percent.pop(filename, None)
elif status == 'error':
logger.error(f"Error occurred: {d.get('error')}")
elif status == 'postprocessing':
logger.info(f"Post-processing: {filename}")
elif status == 'processing':
logger.info(f"Processing: {filename}")
# def ebook(url, author):
# destination = f"/ebooks/{author}"
# os.makedirs(destination, exist_ok=True) # Create the folder if it doesn't exist
# response = requests.get(url, stream=True)
# response.raise_for_status() # Good practice to raise error on bad status
# # Try to extract filename from the headers
# cd = response.headers.get('Content-Disposition')
# if cd and 'filename=' in cd:
# filename = cd.split('filename=')[1].strip('";')
# else:
# # Fallback: get the last part of the URL
# filename = os.path.basename(url)
# file_path = os.path.join(destination, filename)
# with open(file_path, 'wb') as f:
# for chunk in response.iter_content(chunk_size=8192):
# f.write(chunk)
class grab():
def season(url):
page_html=requests.get(url)
soup = BeautifulSoup(page_html.text, 'html.parser')
select_element = soup.find('select', class_='js-switch-season')
options = select_element.find_all('option')
option_values = [option['value'] for option in options if option.has_attr('value')]
seasons = [item.replace(url+'/season:', '') for item in option_values]
return seasons
def poster(url, name, force_download, save_dir='/data/posters/'):
# Use alt for filename if available, fallback to a generic name
alt_value = name
path = urlsplit(url).path
ext = os.path.splitext(path)[-1] or '.jpeg'
safe_name = re.sub(r'[^a-zA-Z0-9\s]', '', alt_value).replace(' ', '_')
filename = f"{safe_name}{ext}"
filepath = os.path.join(save_dir, filename)
if not os.path.exists(filepath) or force_download:
os.makedirs(save_dir, exist_ok=True)
img_data = requests.get(url).content
with open(filepath, 'wb') as handler:
handler.write(img_data)
return filepath
def thumbnail(ydl,url,location):
# Extracting video information
video_info = ydl.extract_info(url, download=False)
thumbnail_url = video_info.get('thumbnail')
# Download the thumbnail image
if thumbnail_url:
try:
thumbnail_filename = os.path.join(location, f"{video_info['id']}.jpg")
with open(thumbnail_filename, 'wb') as thumbnail_file:
thumbnail_file.write(requests.get(thumbnail_url).content)
print("Downloaded MP4 and downloaded thumbnail successfully!")
except Exception as e:
print(f"Error downloading thumbnail: {str(e)}")
else:
print("Downloaded MP4 but no thumbnail found.")
class ArchiveOnlyYDL(yt_dlp.YoutubeDL):
def process_info(self, info_dict):
# """Pretend the video was downloaded successfully, so archive is updated."""
self.record_download_archive(info_dict)
self.to_screen(f"Archived: {info_dict.get('title')}")
return info_dict
class dropout():
def archive(show, season):
with open('/data/dropout.json', 'r') as json_file:
url_mapping = json.load(json_file)
url = next((item['URL'] for item in url_mapping if item['SHOW'] == show), None)
if url is None:
raise ValueError(f"Show '{show}' not found in the JSON data.")
playlist_url = f'{url}/season:{season}'
dl_opts = {
'quiet': True,
'cookiefile': '/data/dropout.cookies.txt',
'download_archive': '/data/logs/dropout.archive.log',
'skip_download': True, # Prevent actual downloads
}
with ArchiveOnlyYDL(dl_opts) as ydl:
ydl.download([playlist_url])
def custom(url, directory, prefix):
filename_template = f"{prefix}%(title)s.%(ext)s" if prefix else "%(title)s.%(ext)s"
dl_opts = {
'progress_hooks': [my_hook],
'download_archive': '/data/logs/dropout.archive.log',
'format': 'bestvideo+bestaudio/best',
'audio_quality': '256K',
'paths': {
'temp': '/temp',
'home': directory,
},
'cookiefile': '/data/dropout.cookies.txt',
'writesubtitles': True,
'subtitleslangs': ['en'],
'outtmpl': filename_template,
}
with yt_dlp.YoutubeDL(dl_opts) as ydl:
ydl.download([url] if isinstance(url, str) else url)
def show(show, season, specials=False, episode_start=None):
season_str = f"{int(season):02}" if not specials else "00"
directory = f"/tv/{show}/{'Specials' if specials else f'Season {season}'}"
os.makedirs(directory, exist_ok=True)
with open('/data/dropout.json', 'r') as json_file:
url_mapping = json.load(json_file)
url = next((item['URL'] for item in url_mapping if item['SHOW'] == show), None)
if url is None:
raise ValueError(f"Show '{show}' not found in the JSON data.")
playlist_url = f'{url}/season:{season}'
# Match filter logic
filter_pattern = (
"title "
f"{'~=' if specials else '!~='} "
r"'(?i).*behind.?the.?scenes.*"
r"|.*trailer.*"
r"|.*recap.*"
r"|.*last.looks.*'"
)
match_filter = yt_dlp.utils.match_filter_func(filter_pattern)
ydl_opts = {
'quiet': True,
'skip_download': True,
'cookiefile': '/data/dropout.cookies.txt',
}
# Extract playlist info
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
playlist_info = ydl.extract_info(playlist_url, download=False)
entries = playlist_info.get('entries', [])
filtered_entries = [entry for entry in entries if match_filter(entry) is None]
episode_start = int(episode_start) if episode_start else 1
for i, entry in enumerate(filtered_entries, start=episode_start):
episode_number = f"{i:02}"
filename_template = f"{show} - S{season_str}E{episode_number} - %(title)s.%(ext)s"
dl_opts = {
'progress_hooks': [my_hook],
'download_archive': '/data/logs/dropout.archive.log',
'format': 'bestvideo+bestaudio/best',
'audio_quality': '256K',
'paths': {
'temp': '/temp',
'home': directory
},
'cookiefile': '/data/dropout.cookies.txt',
'writesubtitles': True,
'subtitleslangs': ['en'],
'outtmpl': filename_template,
}
with yt_dlp.YoutubeDL(dl_opts) as ydl:
ydl.download([entry['webpage_url']])
def series(force_download):
json_data=[]
html=requests.get('https://watch.dropout.tv/series').text
# If you want to parse the HTML
soup = BeautifulSoup(html, 'html.parser')
elements = soup.find_all('a', class_='browse-item-link')
shows = []
for element in elements:
show_data = {}
show_data['href'] = element.get('href', '')
img = element.find('img')
if img:
show_data['src'] = img.get('src', '')
show_data['alt'] = img.get('alt', '')
shows.append(show_data)
# Now 'shows' is a list of dicts, so this works:
for show in shows:
info_data = {}
info_data['SHOW'] = show.get('alt', 'No title')
info_data['URL'] = show.get('href', 'No link')
info_data['LINK'] = re.sub(r".*dropout.tv/", "", show.get('href', ''))
info_data['POSTER'] = grab.poster(show.get('src', ''), show.get('alt', ''),force_download)
json_data.append(info_data)
# Sort the json_data by the 'SHOW' key
# sorted_json_data = sorted(json_data, key=lambda x: x['SHOW'])
with open('/data/dropout.json', 'w') as json_file:
json.dump(json_data, json_file, indent=4)
class youtube():
def ydl(url, location):
logger.info(f'message=Received download request for {url}.')
dl_ops = {
'progress_hooks': [my_hook],
'download_archive': '/data/logs/youtube.archive.log',
'paths': {
'temp': '/temp',
'home': location
},
'outtmpl': '%(uploader)s/%(title)s.%(ext)s'
}
if dl_ops['paths']['home'] == '/podcasts':
dl_ops['format'] = 'bestaudio/best[ext=mp3]'
dl_ops['postprocessors'] = [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}, {
'key': 'FFmpegMetadata',
'add_metadata': True,
}]
elif dl_ops['paths']['home'] == '/asmr':
dl_ops['format'] = 'bestaudio/best[ext=mp3]'
dl_ops['postprocessors'] = [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}, {
'key': 'FFmpegMetadata',
'add_metadata': True,
}]
elif dl_ops['paths']['home'] == '/youtube':
dl_ops['format'] = 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best'
dl_ops['cookiefile'] = '/data/youtube.cookies.txt'
else:
dl_ops['format'] = 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best'
with yt_dlp.YoutubeDL(dl_ops) as ydl:
ydl.download([url])
# grab.thumbnail(ydl,url,location)

View File

@ -1,13 +0,0 @@
"""Download module exports."""
from download.base import my_hook, ArchiveOnlyYDL, grab
from download.dropout import dropout
from download.youtube import youtube
__all__ = [
"my_hook",
"ArchiveOnlyYDL",
"grab",
"dropout",
"youtube",
]

View File

@ -1,139 +0,0 @@
"""Base classes and utilities for downloading."""
import os
import yt_dlp
import requests
import logging
from typing import Dict, Any, Optional
import config
logger = logging.getLogger("syllabus")
# Global or outer-scope tracking dictionary
last_logged_percent = {}
def my_hook(d: Dict[str, Any]) -> None:
"""Logging hook for yt_dlp download progress."""
status = d.get('status')
filename = d.get('filename')
if status == 'downloading':
total_bytes = d.get('total_bytes') or d.get('total_bytes_estimate')
downloaded = d.get('downloaded_bytes', 0)
if total_bytes and filename:
percent = int(downloaded / total_bytes * 100)
current_value = last_logged_percent.get(filename, -10)
if percent >= current_value + 10:
last_logged_percent[filename] = (percent // 10) * 10
logger.info(
f"Downloading: {d.get('_percent_str')} at {d.get('_speed_str')} for {filename}"
)
elif status == 'finished':
logger.info(f"Download completed: {filename}")
last_logged_percent.pop(filename, None)
elif status == 'error':
logger.error(f"Error occurred: {d.get('error')}")
elif status == 'postprocessing':
logger.info(f"Post-processing: {filename}")
elif status == 'processing':
logger.info(f"Processing: {filename}")
class ArchiveOnlyYDL(yt_dlp.YoutubeDL):
"""Custom YoutubeDL class that only updates archive without downloading."""
def process_info(self, info_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Record download in archive without actually downloading."""
self.record_download_archive(info_dict)
self.to_screen(f"Archived: {info_dict.get('title')}")
return info_dict
class grab:
"""Utilities for grabbing media metadata and thumbnails."""
@staticmethod
def season(url: str) -> list:
"""Extract available seasons from a show URL."""
try:
from bs4 import BeautifulSoup
page_html = requests.get(url, timeout=10)
page_html.raise_for_status()
soup = BeautifulSoup(page_html.text, 'html.parser')
select_element = soup.find('select', class_='js-switch-season')
if not select_element:
logger.warning(f"Season select element not found for URL: {url}")
return []
options = select_element.find_all('option')
option_values = [option['value'] for option in options if option.has_attr('value')]
seasons = [item.replace(url + '/season:', '') for item in option_values]
return seasons
except requests.RequestException as e:
logger.error(f"Failed to fetch seasons from {url}: {e}")
return []
except Exception as e:
logger.error(f"Error parsing seasons: {e}")
return []
@staticmethod
def poster(url: str, name: str, force_download: bool, save_dir: Optional[str] = None) -> str:
"""Download and save a poster image."""
from urllib.parse import urlsplit
import re
if save_dir is None:
save_dir = str(config.POSTERS_DIR)
try:
alt_value = name
path = urlsplit(url).path
ext = os.path.splitext(path)[-1] or '.jpeg'
safe_name = re.sub(r'[^a-zA-Z0-9\s]', '', alt_value).replace(' ', '_')
filename = f"{safe_name}{ext}"
filepath = os.path.join(save_dir, filename)
if not os.path.exists(filepath) or force_download:
os.makedirs(save_dir, exist_ok=True)
response = requests.get(url, timeout=10)
response.raise_for_status()
with open(filepath, 'wb') as handler:
handler.write(response.content)
logger.debug(f"Downloaded poster to {filepath}")
# Return relative web path instead of absolute filesystem path
return f"/data/posters/{filename}"
except requests.RequestException as e:
logger.error(f"Failed to download poster from {url}: {e}")
return ""
except IOError as e:
logger.error(f"Failed to save poster to {filepath}: {e}")
return ""
except Exception as e:
logger.error(f"Unexpected error downloading poster: {e}")
return ""
@staticmethod
def thumbnail(ydl: Any, url: str, location: str) -> None:
"""Download and save a video thumbnail."""
try:
video_info = ydl.extract_info(url, download=False)
thumbnail_url = video_info.get('thumbnail')
if thumbnail_url:
try:
thumbnail_filename = os.path.join(location, f"{video_info['id']}.jpg")
response = requests.get(thumbnail_url, timeout=10)
response.raise_for_status()
with open(thumbnail_filename, 'wb') as thumbnail_file:
thumbnail_file.write(response.content)
logger.info("Downloaded MP4 and thumbnail successfully")
except (requests.RequestException, IOError) as e:
logger.error(f"Error downloading thumbnail: {e}")
else:
logger.info("Downloaded MP4 but no thumbnail found")
except Exception as e:
logger.error(f"Error extracting video info for thumbnail: {e}")

View File

@ -1,224 +0,0 @@
"""Dropout.tv downloader."""
import os
import yt_dlp
import json
import requests
import re
import logging
from typing import Optional, List
import config
from download.base import my_hook, ArchiveOnlyYDL, grab
logger = logging.getLogger("syllabus")
class dropout:
"""Dropout.tv content downloader."""
@staticmethod
def archive(show: str, season: int) -> None:
"""Add a season to archive without downloading."""
try:
with open(config.DROPOUT_JSON, 'r') as json_file:
url_mapping = json.load(json_file, object_pairs_hook=dict)
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Failed to read dropout JSON: {e}")
raise
url = next((item['URL'] for item in url_mapping if item['SHOW'] == show), None)
if url is None:
raise ValueError(f"Show '{show}' not found in the JSON data.")
playlist_url = f'{url}/season:{season}'
dl_opts = {
'quiet': True,
'cookiefile': str(config.DROPOUT_COOKIES),
'download_archive': str(config.DROPOUT_ARCHIVE),
'skip_download': True,
}
try:
with ArchiveOnlyYDL(dl_opts) as ydl:
ydl.download([playlist_url])
logger.info(f"Archived show {show}, season {season}")
except Exception as e:
logger.error(f"Error archiving show {show}, season {season}: {e}")
raise
@staticmethod
def custom(url: str, directory: str, prefix: Optional[str] = None) -> None:
"""Download content from a custom URL with optional prefix."""
try:
filename_template = f"{prefix}%(title)s.%(ext)s" if prefix else "%(title)s.%(ext)s"
dl_opts = {
'progress_hooks': [my_hook],
'download_archive': str(config.DROPOUT_ARCHIVE),
'format': config.DEFAULT_FORMAT,
'audio_quality': '256K',
'paths': {
'temp': str(config.TEMP_DIR),
'home': directory,
},
'cookiefile': str(config.DROPOUT_COOKIES),
'writesubtitles': True,
'subtitleslangs': ['en'],
'outtmpl': filename_template,
}
with yt_dlp.YoutubeDL(dl_opts) as ydl:
ydl.download([url] if isinstance(url, str) else url)
logger.info(f"Custom download completed for {url}")
except Exception as e:
logger.error(f"Error in custom download: {e}")
raise
@staticmethod
def show(show: str, season: int, specials: bool = False, episode_start: Optional[int] = None) -> None:
"""Download a season of a show from dropout.tv."""
try:
season_str = f"{int(season):02}" if not specials else "00"
season_type = 'Specials' if specials else f'Season {season}'
directory = str(config.TV_DIR / show / season_type)
os.makedirs(directory, exist_ok=True)
with open(config.DROPOUT_JSON, 'r') as json_file:
url_mapping = json.load(json_file, object_pairs_hook=dict)
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Failed to read dropout JSON: {e}")
raise
url = next((item['URL'] for item in url_mapping if item['SHOW'] == show), None)
if url is None:
raise ValueError(f"Show '{show}' not found in the JSON data.")
playlist_url = f'{url}/season:{season}'
# Match filter logic
filter_pattern = (
"title "
f"{'~=' if specials else '!~='} "
r"'(?i).*behind.?the.?scenes.*"
r"|.*trailer.*"
r"|.*recap.*"
r"|.*last.looks.*'"
)
match_filter = yt_dlp.utils.match_filter_func(filter_pattern)
ydl_opts = {
'quiet': True,
'skip_download': True,
'cookiefile': str(config.DROPOUT_COOKIES),
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
playlist_info = ydl.extract_info(playlist_url, download=False)
except Exception as e:
logger.error(f"Failed to extract playlist info: {e}")
raise
entries = playlist_info.get('entries', [])
filtered_entries = [entry for entry in entries if match_filter(entry) is None]
episode_start_num = int(episode_start) if episode_start else 1
for i, entry in enumerate(filtered_entries, start=episode_start_num):
episode_number = f"{i:02}"
filename_template = f"{show} - S{season_str}E{episode_number} - %(title)s.%(ext)s"
dl_opts = {
'progress_hooks': [my_hook],
'download_archive': str(config.DROPOUT_ARCHIVE),
'format': config.DEFAULT_FORMAT,
'audio_quality': '256K',
'paths': {
'temp': str(config.TEMP_DIR),
'home': directory
},
'cookiefile': str(config.DROPOUT_COOKIES),
'writesubtitles': True,
'subtitleslangs': ['en'],
'outtmpl': filename_template,
}
try:
with yt_dlp.YoutubeDL(dl_opts) as ydl:
ydl.download([entry['webpage_url']])
except Exception as e:
logger.error(f"Error downloading episode {episode_number}: {e}")
continue
@staticmethod
def series(force_download: bool = False) -> None:
"""Update the series list from dropout.tv."""
from bs4 import BeautifulSoup
try:
response = requests.get(f'{config.DROPOUT_BASE_URL}/series', timeout=10)
response.raise_for_status()
html = response.text
soup = BeautifulSoup(html, 'html.parser')
elements = soup.find_all('a', class_='browse-item-link')
shows = []
for element in elements:
show_data = {}
show_data['href'] = element.get('href', '')
img = element.find('img')
if img:
show_data['src'] = img.get('src', '')
show_data['alt'] = img.get('alt', '')
shows.append(show_data)
# Load existing shows to merge
existing_shows = {}
try:
with open(config.DROPOUT_JSON, 'r') as f:
existing_data = json.load(f, object_pairs_hook=dict)
for show in existing_data:
existing_shows[show['SHOW']] = show
except (FileNotFoundError, json.JSONDecodeError):
existing_data = []
# Merge with new scraped shows
json_data = []
scraped_titles = set()
for show in shows:
show_title = show.get('alt', 'No title')
scraped_titles.add(show_title)
info_data = {}
info_data['SHOW'] = show_title
info_data['URL'] = show.get('href', 'No link')
info_data['LINK'] = re.sub(r".*dropout.tv/", "", show.get('href', ''))
poster_path = grab.poster(show.get('src', ''), show.get('alt', ''), force_download)
if poster_path:
info_data['POSTER'] = poster_path
json_data.append(info_data)
# Add back any manually added shows that weren't scraped
for show_title, show_data in existing_shows.items():
if show_title not in scraped_titles:
json_data.append(show_data)
os.makedirs(config.DATA_DIR, exist_ok=True)
with open(config.DROPOUT_JSON, 'w') as json_file:
json.dump(json_data, json_file, indent=4, separators=(',', ': '))
logger.info(f"Updated series list with {len(json_data)} shows (merged with existing)")
except requests.RequestException as e:
logger.error(f"Failed to fetch series list: {e}")
raise
except (IOError, json.JSONDecodeError) as e:
logger.error(f"Failed to save series JSON: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error updating series: {e}")
raise

View File

@ -1,58 +0,0 @@
"""YouTube downloader."""
import logging
from typing import Any, Dict
import config
from download.base import my_hook
import yt_dlp
logger = logging.getLogger("syllabus")
class youtube:
"""YouTube content downloader."""
@staticmethod
def ydl(url: str, location: str) -> None:
"""Download a YouTube video to the specified location."""
try:
logger.info(f'Received download request for {url}')
dl_ops: Dict[str, Any] = {
'progress_hooks': [my_hook],
'download_archive': str(config.YOUTUBE_ARCHIVE),
'paths': {
'temp': str(config.TEMP_DIR),
'home': location
},
'outtmpl': '%(uploader)s/%(title)s.%(ext)s'
}
# Audio format configuration
audio_format_config = {
'format': config.AUDIO_FORMAT,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': config.AUDIO_QUALITY,
}, {
'key': 'FFmpegMetadata',
'add_metadata': True,
}]
}
# Apply format-specific options based on location
if location == str(config.PODCASTS_DIR) or location == str(config.ASMR_DIR):
dl_ops.update(audio_format_config)
elif location == str(config.YOUTUBE_DIR):
dl_ops['format'] = 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best'
dl_ops['cookiefile'] = str(config.YOUTUBE_COOKIES)
else:
dl_ops['format'] = 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best'
with yt_dlp.YoutubeDL(dl_ops) as ydl:
ydl.download([url])
logger.info(f"Download completed for {url}")
except Exception as e:
logger.error(f"Error downloading {url}: {e}")
raise

View File

@ -1,38 +1,269 @@
"""Main FastAPI application entry point."""
from fastapi import FastAPI, Request, Form, BackgroundTasks
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from functools import partial
import json, download, asyncio
from typing import Optional
import logging, os
from logging.handlers import TimedRotatingFileHandler
from core.app import create_app
from routes import api_router, web_router
from routes.scheduler import router as scheduler_router
from core.scheduler import init_scheduler, shutdown_scheduler
import logging
# Ensure log directory exists
os.makedirs("/data/logs", exist_ok=True)
# Create the FastAPI app
app = create_app()
# Get logger
# Setup timed rotating logger
# log_path = "/data/logs/syllabus.log"
logger = logging.getLogger("syllabus")
logger.setLevel(logging.DEBUG)
# Include routers
app.include_router(api_router)
app.include_router(web_router)
app.include_router(scheduler_router)
# Remove any default handlers
logger.handlers = []
# Initialize scheduler on startup
@app.on_event("startup")
async def startup():
"""Initialize scheduler on startup."""
await init_scheduler()
logger.info("Scheduler started")
# Set up TimedRotatingFileHandler
handler = TimedRotatingFileHandler(
filename="/data/logs/syllabus.log",
when="midnight", # Rotate at midnight
interval=30, # Every 30 day
backupCount=12, # Keep last 7 logs
encoding="utf-8",
utc=False # Use UTC for time reference
)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# App setup
app = FastAPI()
app.mount("/data", StaticFiles(directory="/data"), name="data")
templates = Jinja2Templates(directory="templates")
loop = asyncio.get_event_loop()
# Optional cache
cached_data = None
@app.on_event("shutdown")
async def shutdown():
"""Shutdown scheduler on app shutdown."""
shutdown_scheduler()
logger.info("Scheduler stopped")
logger.info("Application initialized successfully")
# Middleware
@app.middleware("http")
async def log_requests(request: Request, call_next):
try:
response = await call_next(request)
except Exception as e:
logger.exception(f"EXCEPTION: {request.method} {request.url} - {str(e)}")
return JSONResponse(
status_code=500,
content={"detail": "Internal Server Error"},
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
logger.info(
f"request_client={request.client.host}:{request.client.port}, "
f"request_method={request.method}, request_url={request.url}, "
f"status_code={response.status_code}"
)
return response
# api
# @app.post("/ebook/download", description="Download an ebook via a url.")
# async def ebookDownload(
# background_tasks: BackgroundTasks,
# url: str = Form(...),
# author: str = Form(...)
# ):
# try:
# background_tasks.add_task(download.ebook,url,author)
# # download.dropout.show(show,season,episode)
# return JSONResponse(status_code=200, content={"status": "success", "message": "Book downloaded."})
# except Exception as e:
# return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@app.get("/dropout/update")
async def dropoutUpdate(force: bool = False):
global cached_data
try:
download.dropout.series(force)
with open('/data/dropout.json') as f:
cached_data = json.load(f)
return JSONResponse(status_code=200, content={"status": "success", "message": "Series grab complete."})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@app.get("/dropout/series")
async def dropoutSeries():
global cached_data
if cached_data is None:
await dropoutUpdate()
try:
return JSONResponse(content=cached_data)
except:
return JSONResponse(content={"error": "File not found"}, status_code=404)
async def get_show_data(show: str, force: bool = False):
global cached_data
if cached_data is None:
await dropoutUpdate()
for item in cached_data:
if show == item["SHOW"] or show == item["LINK"]:
if "SEASONS" not in item or force is not False:
item['SEASONS'] = download.grab.season(item['URL'])
return item
return None
def get_latest_season(item):
seasons = item.get("SEASONS")
if seasons and isinstance(seasons, list):
try:
numeric_seasons = [int(s) for s in seasons if str(s).isdigit()]
if numeric_seasons:
return max(numeric_seasons)
except Exception as e:
logging.error(f"Error getting latest season: {e}")
return None
@app.post("/dropout/custom", description="")
async def dropout_download(
background_tasks: BackgroundTasks,
url: str = Form(...),
directory: str = Form(...),
prefix: Optional[str] = Form(None)
):
# Ensure output directory exists
os.makedirs(directory, exist_ok=True)
try:
background_tasks.add_task(download.dropout.custom, url, directory, prefix)
return {"status": "success", "message": "Download started"}
except Exception as e:
raise JSONResponse(status_code=500, content=f"Download failed: {str(e)}")
@app.post("/dropout/download", description="Download an entire season from episode 1. Ignores behind the scenes and trailers.")
async def dropout_download(
background_tasks: BackgroundTasks,
show: str = Form(...),
season: Optional[int] = Form(None),
latest: bool = Form(True),
archive: bool = Form(False),
specials: bool = Form(False),
episode_start: Optional[int] = Form(None)
):
try:
# Resolve latest season if requested
if latest and season is None:
show_data = await get_show_data(show, True)
if not show_data:
return JSONResponse(
status_code=404,
content={"status": "error", "message": "Show not found"}
)
season = get_latest_season(show_data)
if season is None:
return JSONResponse(
status_code=400,
content={"status": "error", "message": "No valid seasons found"}
)
# Ensure season is specified by now
if season is None:
return JSONResponse(
status_code=400,
content={"status": "error", "message": "Season is required unless 'latest' is used."}
)
task_msg = f"{'Adding to archive' if archive else 'Starting download'} for show '{show}', season {season}{' specials' if specials else ''}."
logger.info(f"message={task_msg}")
# Schedule the background task
if archive:
background_tasks.add_task(download.dropout.archive, show, season)
else:
background_tasks.add_task(download.dropout.show, show, season, specials, episode_start)
return JSONResponse(
status_code=200,
content={
"status": "success",
"message": (task_msg)
}
)
except Exception as e:
logger.exception(f"Unhandled exception during /dropout/download: {e}")
return JSONResponse(
status_code=500,
content={"status": "error", "message": "An unexpected error occurred."}
)
# @app.post("/dropout/download/specials", description="Downloads a seasons behind the scenes and trailers, ignores main episodes.")
# async def dropoutDownload(
# background_tasks: BackgroundTasks,
# show: str = Form(...),
# season: int = Form(...),
# episode: Optional[int] = Form(None)
# ):
# try:
# logger.info(f'message=Received download request for specials of season {season} of {show}.')
# background_tasks.add_task(download.dropout.specials,show,season,episode)
# # download.dropout.show(show,season,episode)
# return JSONResponse(status_code=200, content={"status": "success", "message": "Series downloaded."})
# except Exception as e:
# return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@app.post("/ydl")
async def ydl(background_tasks: BackgroundTasks, url: str = Form(...), location: str = Form(...)):
try:
background_tasks.add_task(download.youtube.ydl, url, location)
# download.youtube.ydl(url,location)
# grab.thumbnail(ydl,url,location)
return JSONResponse(status_code=200, content={"status": "success", "message": "Video download completed."})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
#web ui
@app.get("/", include_in_schema=False, response_class=HTMLResponse)
async def index(request: Request):
global cached_data
try:
if cached_data is None:
await dropoutUpdate()
return templates.TemplateResponse("index.html", {"request": request, "data": cached_data})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@app.get("/show/{show}", include_in_schema=False, response_class=HTMLResponse)
async def index(request: Request, show: str):
try:
item = await get_show_data(show)
if item:
return templates.TemplateResponse("show.html", {"request": request, "show": item})
else:
return JSONResponse(status_code=404, content={"status": "error", "message": "Show not found"})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@app.get("/ydl", include_in_schema=False)
async def webpage(request: Request):
try:
return templates.TemplateResponse("ydl.html", {"request": request})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@app.get("/dropout", include_in_schema=False)
async def webpage(request: Request):
global cached_data
if cached_data is None:
await dropoutUpdate()
try:
return templates.TemplateResponse("dropout.html", {"request": request, "data": cached_data})
except Exception as e:
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})

View File

@ -1,6 +0,0 @@
"""Routes module exports."""
from routes.api import router as api_router
from routes.web import router as web_router
__all__ = ["api_router", "web_router"]

View File

@ -1,314 +0,0 @@
"""API Routes for media downloading."""
import logging
import json
import os
import re
from typing import Optional, Dict, Any
from fastapi import APIRouter, BackgroundTasks, Form
from fastapi.responses import JSONResponse
import config
import download
from core.cache import series_cache
logger = logging.getLogger("syllabus")
router = APIRouter(prefix="/api", tags=["API"])
@router.get("/dropout/update", description="Update the series list from dropout.tv")
async def dropout_update_route(background_tasks: BackgroundTasks, force: bool = False) -> JSONResponse:
"""Queue series list update as background task."""
try:
background_tasks.add_task(download.dropout.series, force)
return JSONResponse(status_code=202, content={"status": "success", "message": "Series update queued in background"})
except Exception as e:
logger.error(f"Error queuing series update: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@router.get("/posters/update", description="Force update all show posters")
async def posters_update_route(background_tasks: BackgroundTasks) -> JSONResponse:
"""Queue poster update as background task."""
try:
background_tasks.add_task(download.dropout.series, True)
return JSONResponse(status_code=202, content={"status": "success", "message": "Poster update queued in background"})
except Exception as e:
logger.error(f"Error queuing poster update: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@router.post("/dropout/upload-html", description="Upload Dropout series HTML for manual scraping")
async def dropout_upload_html(html: str = Form(...)) -> JSONResponse:
"""
Upload HTML from Dropout series page to extract shows manually.
Useful when the automatic scraper misses shows that require scrolling.
"""
try:
from bs4 import BeautifulSoup
logger.info(f"HTML upload received: {len(html)} characters")
if not html or len(html) < 100:
return JSONResponse(
status_code=400,
content={"status": "error", "message": "HTML content is too short or empty"}
)
soup = BeautifulSoup(html, 'html.parser')
elements = soup.find_all('a', class_='browse-item-link')
logger.info(f"Found {len(elements)} show elements in HTML")
if not elements:
return JSONResponse(
status_code=400,
content={"status": "error", "message": "No shows found in HTML. Make sure you copied the full page HTML from https://watch.dropout.tv/series"}
)
shows = []
for element in elements:
show_data = {}
show_data['href'] = element.get('href', '')
img = element.find('img')
if img:
show_data['src'] = img.get('src', '')
show_data['alt'] = img.get('alt', '')
shows.append(show_data)
logger.info(f"Processing {len(shows)} shows for poster download")
# Load existing shows to merge
existing_shows = {}
try:
with open(config.DROPOUT_JSON, 'r') as f:
existing_data = json.load(f, object_pairs_hook=dict)
for show in existing_data:
existing_shows[show['SHOW']] = show
except (FileNotFoundError, json.JSONDecodeError):
existing_data = []
# Process new shows
json_data = list(existing_data) if existing_data else []
new_count = 0
updated_count = 0
poster_failures = []
for show in shows:
show_title = show.get('alt', 'No title')
info_data = {}
info_data['SHOW'] = show_title
info_data['URL'] = show.get('href', 'No link')
info_data['LINK'] = re.sub(r".*dropout.tv/", "", show.get('href', ''))
# Handle poster URL - prepend base URL if relative
poster_url = show.get('src', '')
if poster_url and not poster_url.startswith('http'):
# Relative path, prepend base URL
poster_url = config.DROPOUT_POSTER_BASE_URL.rstrip('/') + '/' + poster_url.lstrip('./')
logger.debug(f"Processing poster for {show_title}: {poster_url}")
from download.base import grab
poster_path = grab.poster(poster_url, show.get('alt', ''), force_download=False)
if poster_path:
info_data['POSTER'] = poster_path
logger.debug(f"Successfully grabbed poster: {poster_path}")
else:
logger.warning(f"Failed to grab poster for {show_title} from {poster_url}")
poster_failures.append(show_title)
# Check if show exists
if show_title in existing_shows:
# Update existing
idx = next((i for i, s in enumerate(json_data) if s.get('SHOW') == show_title), -1)
if idx >= 0:
json_data[idx] = info_data
updated_count += 1
else:
# Add new
json_data.append(info_data)
new_count += 1
# Save updated JSON
logger.info(f"Saving {len(json_data)} total shows to JSON")
os.makedirs(config.DATA_DIR, exist_ok=True)
with open(config.DROPOUT_JSON, 'w') as f:
json.dump(json_data, f, indent=4, separators=(',', ': '))
series_cache.load_from_file(str(config.DROPOUT_JSON))
logger.info(f"HTML upload complete: {new_count} new, {updated_count} updated")
return JSONResponse(
status_code=200,
content={
"status": "success",
"message": f"Added {new_count} new shows, updated {updated_count} existing shows",
"total_shows": len(json_data),
"new": new_count,
"updated": updated_count,
"poster_failures": len(poster_failures),
"failed_shows": poster_failures[:10] if poster_failures else []
}
)
except Exception as e:
logger.error(f"Error uploading HTML: {e}", exc_info=True)
return JSONResponse(
status_code=500,
content={"status": "error", "message": str(e)}
)
@router.get("/dropout/series", description="Get the list of available shows")
async def dropout_series_route() -> JSONResponse:
"""Get the cached series data."""
try:
data = series_cache.get()
if data is None:
series_cache.load_from_file(str(config.DROPOUT_JSON))
data = series_cache.get()
if data is None:
return JSONResponse(content={"error": "Series data not available"}, status_code=503)
return JSONResponse(content=data)
except Exception as e:
logger.error(f"Error fetching series: {e}")
return JSONResponse(content={"error": "Failed to fetch series"}, status_code=500)
@router.post("/dropout/custom", description="Download content from a custom URL")
async def dropout_custom_route(
background_tasks: BackgroundTasks,
url: str = Form(...),
directory: str = Form(...),
prefix: Optional[str] = Form(None)
) -> JSONResponse:
"""Download from a custom URL with optional prefix."""
try:
import os
os.makedirs(directory, exist_ok=True)
background_tasks.add_task(download.dropout.custom, url, directory, prefix)
return JSONResponse(status_code=202, content={"status": "success", "message": "Download started"})
except Exception as e:
logger.error(f"Error starting custom download: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@router.post("/dropout/download", description="Download an entire season from episode 1")
async def dropout_download_route(
background_tasks: BackgroundTasks,
show: str = Form(...),
season: Optional[int] = Form(None),
latest: bool = Form(True),
archive: bool = Form(False),
specials: bool = Form(False),
episode_start: Optional[int] = Form(None)
) -> JSONResponse:
"""Download a season of a show."""
try:
# Resolve latest season if requested
if latest and season is None:
show_data = await get_show_data(show, True)
if not show_data:
return JSONResponse(
status_code=404,
content={"status": "error", "message": "Show not found"}
)
season = get_latest_season(show_data)
if season is None:
return JSONResponse(
status_code=400,
content={"status": "error", "message": "No valid seasons found"}
)
# Ensure season is specified by now
if season is None:
return JSONResponse(
status_code=400,
content={"status": "error", "message": "Season is required unless 'latest' is used."}
)
task_msg = f"{'Adding to archive' if archive else 'Starting download'} for show '{show}', season {season}{' specials' if specials else ''}."
logger.info(f"message={task_msg}")
# Schedule the background task
if archive:
background_tasks.add_task(download.dropout.archive, show, season)
else:
background_tasks.add_task(download.dropout.show, show, season, specials, episode_start)
return JSONResponse(
status_code=202,
content={
"status": "success",
"message": task_msg
}
)
except Exception as e:
logger.exception(f"Unhandled exception during /dropout/download: {e}")
return JSONResponse(
status_code=500,
content={"status": "error", "message": "An unexpected error occurred."}
)
@router.post("/ydl", description="Download a YouTube video")
async def youtube_download_route(
background_tasks: BackgroundTasks,
url: str = Form(...),
location: str = Form(...)
) -> JSONResponse:
"""Download a YouTube video to the specified location."""
try:
background_tasks.add_task(download.youtube.ydl, url, location)
return JSONResponse(status_code=202, content={"status": "success", "message": "Download started"})
except Exception as e:
logger.error(f"Error starting YouTube download: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
# Helper functions
async def get_show_data(show: str, force: bool = False) -> Optional[Dict[str, Any]]:
"""Get data for a specific show by name or link."""
try:
cached = series_cache.get()
if cached is None:
series_cache.load_from_file(str(config.DROPOUT_JSON))
cached = series_cache.get()
if cached is None:
return None
for item in cached:
if show == item.get("SHOW") or show == item.get("LINK"):
if "SEASONS" not in item or force:
try:
item['SEASONS'] = download.grab.season(item['URL'])
except Exception as e:
logger.error(f"Failed to fetch seasons for {show}: {e}")
item['SEASONS'] = []
return item
return None
except Exception as e:
logger.error(f"Error getting show data: {e}")
return None
def get_latest_season(item: Dict[str, Any]) -> Optional[int]:
"""Extract the latest season number from show data."""
try:
seasons = item.get("SEASONS")
if seasons and isinstance(seasons, list):
numeric_seasons = [int(s) for s in seasons if str(s).isdigit()]
if numeric_seasons:
return max(numeric_seasons)
except ValueError as e:
logger.error(f"Error parsing season numbers: {e}")
except Exception as e:
logger.error(f"Error getting latest season: {e}")
return None

View File

@ -1,129 +0,0 @@
"""Scheduler API Routes - Manage scheduled tasks."""
import logging
from typing import Optional, List, Dict, Any
from fastapi import APIRouter, Form
from fastapi.responses import JSONResponse
from core.scheduler import add_job, remove_job, get_jobs
logger = logging.getLogger("syllabus")
router = APIRouter(prefix="/api/schedule", tags=["Scheduler"])
@router.post("/add", description="Add a scheduled task")
async def add_scheduled_task(
job_id: str = Form(...),
task: str = Form(...),
cron: str = Form(...),
show: Optional[str] = Form(None),
season: Optional[int] = Form(None),
specials: bool = Form(False)
) -> JSONResponse:
"""
Add a scheduled task.
**Tasks:**
- `download_show`: Download specific show/season (requires: show, season)
- `download_latest`: Download latest season (requires: show)
- `update_series`: Update series list (no params needed)
- `update_posters`: Force re-download all show posters (no params needed)
**Cron Format:** (minute hour day month day_of_week)
- `0 2 * * *` = Daily at 2 AM
- `0 */6 * * *` = Every 6 hours
- `0 0 * * 0` = Weekly on Sunday at midnight
"""
try:
# Validate task type
valid_tasks = ["download_show", "download_latest", "update_series", "update_posters"]
if task not in valid_tasks:
return JSONResponse(
status_code=400,
content={"status": "error", "message": f"Invalid task. Must be one of: {valid_tasks}"}
)
# Build kwargs based on task
kwargs = {}
if task == "download_show":
if not show or season is None:
return JSONResponse(
status_code=400,
content={"status": "error", "message": "download_show requires 'show' and 'season'"}
)
kwargs = {"show": show, "season": season, "specials": specials}
elif task == "download_latest":
if not show:
return JSONResponse(
status_code=400,
content={"status": "error", "message": "download_latest requires 'show'"}
)
kwargs = {"show": show}
# Add the job
success = add_job(job_id, task, cron, kwargs)
if success:
return JSONResponse(
status_code=201,
content={
"status": "success",
"message": f"Job '{job_id}' scheduled",
"job_id": job_id,
"task": task,
"cron": cron
}
)
else:
return JSONResponse(
status_code=500,
content={"status": "error", "message": "Failed to add job"}
)
except Exception as e:
logger.error(f"Error adding scheduled task: {e}")
return JSONResponse(
status_code=500,
content={"status": "error", "message": str(e)}
)
@router.delete("/remove/{job_id}", description="Remove a scheduled task")
async def remove_scheduled_task(job_id: str) -> JSONResponse:
"""Remove a scheduled task by ID."""
try:
success = remove_job(job_id)
if success:
return JSONResponse(
status_code=200,
content={"status": "success", "message": f"Job '{job_id}' removed"}
)
else:
return JSONResponse(
status_code=404,
content={"status": "error", "message": f"Job '{job_id}' not found"}
)
except Exception as e:
logger.error(f"Error removing scheduled task: {e}")
return JSONResponse(
status_code=500,
content={"status": "error", "message": str(e)}
)
@router.get("/list", description="List all scheduled tasks")
async def list_scheduled_tasks() -> JSONResponse:
"""Get list of all scheduled tasks."""
try:
jobs = get_jobs()
return JSONResponse(
status_code=200,
content={
"status": "success",
"count": len(jobs),
"jobs": jobs
}
)
except Exception as e:
logger.error(f"Error listing scheduled tasks: {e}")
return JSONResponse(
status_code=500,
content={"status": "error", "message": str(e)}
)

View File

@ -1,106 +0,0 @@
"""Web UI Routes."""
import logging
from typing import Optional, Dict, Any
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
import config
import download
from core.cache import series_cache
logger = logging.getLogger("syllabus")
router = APIRouter(tags=["Web UI"])
@router.get("/", include_in_schema=False, response_class=HTMLResponse)
async def index_route(request: Request) -> HTMLResponse:
"""Home page showing list of shows."""
try:
data = series_cache.get()
if data is None:
series_cache.load_from_file(str(config.DROPOUT_JSON))
data = series_cache.get()
if data is None:
return HTMLResponse("<h1>Loading...</h1>", status_code=503)
return request.app.templates.TemplateResponse(
"index.html", {"request": request, "data": data}
)
except Exception as e:
logger.error(f"Error rendering index: {e}")
return HTMLResponse(f"<h1>Error: {str(e)}</h1>", status_code=500)
@router.get("/show/{show}", include_in_schema=False, response_class=HTMLResponse)
async def show_route(request: Request, show: str) -> HTMLResponse:
"""Show page with download options."""
try:
item = await get_show_data(show)
if item:
return request.app.templates.TemplateResponse(
"show.html", {"request": request, "show": item}
)
else:
return HTMLResponse("<h1>Show not found</h1>", status_code=404)
except Exception as e:
logger.error(f"Error rendering show page: {e}")
return HTMLResponse(f"<h1>Error: {str(e)}</h1>", status_code=500)
@router.get("/ydl", include_in_schema=False, response_class=HTMLResponse)
async def ydl_page(request: Request) -> HTMLResponse:
"""YouTube downloader page."""
try:
return request.app.templates.TemplateResponse("ydl.html", {"request": request})
except Exception as e:
logger.error(f"Error rendering YDL page: {e}")
return HTMLResponse(f"<h1>Error: {str(e)}</h1>", status_code=500)
@router.get("/dropout", include_in_schema=False, response_class=HTMLResponse)
async def dropout_page(request: Request) -> HTMLResponse:
"""Dropout downloader page."""
try:
data = series_cache.get()
if data is None:
series_cache.load_from_file(str(config.DROPOUT_JSON))
data = series_cache.get()
if data is None:
return HTMLResponse("<h1>Loading...</h1>", status_code=503)
return request.app.templates.TemplateResponse(
"dropout.html", {"request": request, "data": data}
)
except Exception as e:
logger.error(f"Error rendering dropout page: {e}")
return HTMLResponse(f"<h1>Error: {str(e)}</h1>", status_code=500)
# Helper functions
async def get_show_data(show: str, force: bool = False) -> Optional[Dict[str, Any]]:
"""Get data for a specific show by name or link."""
try:
cached = series_cache.get()
if cached is None:
series_cache.load_from_file(str(config.DROPOUT_JSON))
cached = series_cache.get()
if cached is None:
return None
for item in cached:
if show == item.get("SHOW") or show == item.get("LINK"):
if "SEASONS" not in item or force:
try:
item['SEASONS'] = download.grab.season(item['URL'])
except Exception as e:
logger.error(f"Failed to fetch seasons for {show}: {e}")
item['SEASONS'] = []
return item
return None
except Exception as e:
logger.error(f"Error getting show data: {e}")
return None

View File

@ -60,7 +60,6 @@
.watch-btn {
margin-top: 30px;
margin-right: 10px;
padding: 12px 25px;
font-size: 1em;
background-color: #e50914;
@ -68,40 +67,12 @@
border: none;
border-radius: 5px;
cursor: pointer;
transition: background-color 0.3s;
}
.watch-btn:hover {
background-color: #b20710;
}
.watchlist-btn {
margin-top: 30px;
padding: 12px 25px;
font-size: 1em;
background-color: #404040;
color: #fff;
border: 2px solid #555;
border-radius: 5px;
cursor: pointer;
transition: all 0.3s;
}
.watchlist-btn:hover {
background-color: #505050;
border-color: #777;
}
.watchlist-btn.active {
background-color: #1db954;
border-color: #1aa34a;
}
.watchlist-btn.active:hover {
background-color: #1aa34a;
border-color: #16a336;
}
a {
color: #ccc;
display: inline-block;
@ -130,87 +101,11 @@
</div>
<button class="watch-btn" onclick="watchShow()">▶ Watch Now</button>
<button id="watchlist-btn" class="watchlist-btn" onclick="toggleWatchlist()">+ Add to Watchlist</button>
<p><a href="/">← Back to all shows</a></p>
</div>
<script>
const showName = "{{ show['SHOW'] }}";
// Initialize watchlist button state
async function initWatchlistButton() {
try {
const response = await fetch('/api/schedule/list');
const data = await response.json();
const watchlistJobId = `watch_${showName.toLowerCase().replace(/\s+/g, '_')}`;
const isWatchlisted = data.jobs.some(job => job.job_id === watchlistJobId);
const btn = document.getElementById('watchlist-btn');
if (isWatchlisted) {
btn.textContent = '✓ Remove from Watchlist';
btn.classList.add('active');
} else {
btn.textContent = '+ Add to Watchlist';
btn.classList.remove('active');
}
} catch (err) {
console.error('Error initializing watchlist button:', err);
}
}
// Toggle watchlist
async function toggleWatchlist() {
const btn = document.getElementById('watchlist-btn');
const watchlistJobId = `watch_${showName.toLowerCase().replace(/\s+/g, '_')}`;
try {
// Check if already watchlisted
const response = await fetch('/api/schedule/list');
const data = await response.json();
const isWatchlisted = data.jobs.some(job => job.job_id === watchlistJobId);
if (isWatchlisted) {
// Remove from watchlist
const deleteResponse = await fetch(`/api/schedule/remove/${watchlistJobId}`, {
method: 'DELETE'
});
if (deleteResponse.ok) {
btn.textContent = '+ Add to Watchlist';
btn.classList.remove('active');
alert(`${showName} removed from watchlist.`);
} else {
alert('Error removing from watchlist.');
}
} else {
// Add to watchlist (schedule daily at 6 AM)
const formData = new FormData();
formData.append('job_id', watchlistJobId);
formData.append('task', 'download_latest');
formData.append('show', showName);
formData.append('cron', '10 19 * * *'); // Daily at 7:10 PM
const addResponse = await fetch('/api/schedule/add', {
method: 'POST',
body: formData
});
if (addResponse.ok) {
btn.textContent = '✓ Remove from Watchlist';
btn.classList.add('active');
alert(`${showName} added to watchlist. New episodes will download daily at 7:10 PM.`);
} else {
alert('Error adding to watchlist.');
}
}
} catch (err) {
console.error(err);
alert('Error toggling watchlist.');
}
}
function downloadSeason(season) {
const formData = new FormData();
formData.append("show", "{{ show['SHOW'] }}");
@ -255,9 +150,6 @@
alert("Error starting playback.");
});
}
// Initialize watchlist button when page loads
document.addEventListener('DOMContentLoaded', initWatchlistButton);
</script>
</body>
</html>

View File

@ -1,9 +1,9 @@
beautifulsoup4
bs4
yt_dlp
requests
fastapi
pathlib
uvicorn
jinja2
python-multipart
python-dotenv
apscheduler
selenium