"""Dropout.tv downloader.""" import os import yt_dlp import json import requests import re import logging from typing import Optional, List import config from download.base import my_hook, ArchiveOnlyYDL, grab logger = logging.getLogger("syllabus") class dropout: """Dropout.tv content downloader.""" @staticmethod def archive(show: str, season: int) -> None: """Add a season to archive without downloading.""" try: with open(config.DROPOUT_JSON, 'r') as json_file: url_mapping = json.load(json_file, object_pairs_hook=dict) except (IOError, json.JSONDecodeError) as e: logger.error(f"Failed to read dropout JSON: {e}") raise url = next((item['URL'] for item in url_mapping if item['SHOW'] == show), None) if url is None: raise ValueError(f"Show '{show}' not found in the JSON data.") playlist_url = f'{url}/season:{season}' dl_opts = { 'quiet': True, 'cookiefile': str(config.DROPOUT_COOKIES), 'download_archive': str(config.DROPOUT_ARCHIVE), 'skip_download': True, } try: with ArchiveOnlyYDL(dl_opts) as ydl: ydl.download([playlist_url]) logger.info(f"Archived show {show}, season {season}") except Exception as e: logger.error(f"Error archiving show {show}, season {season}: {e}") raise @staticmethod def custom(url: str, directory: str, prefix: Optional[str] = None) -> None: """Download content from a custom URL with optional prefix.""" try: filename_template = f"{prefix}%(title)s.%(ext)s" if prefix else "%(title)s.%(ext)s" dl_opts = { 'progress_hooks': [my_hook], 'download_archive': str(config.DROPOUT_ARCHIVE), 'format': config.DEFAULT_FORMAT, 'audio_quality': '256K', 'paths': { 'temp': str(config.TEMP_DIR), 'home': directory, }, 'cookiefile': str(config.DROPOUT_COOKIES), 'writesubtitles': True, 'subtitleslangs': ['en'], 'outtmpl': filename_template, } with yt_dlp.YoutubeDL(dl_opts) as ydl: ydl.download([url] if isinstance(url, str) else url) logger.info(f"Custom download completed for {url}") except Exception as e: logger.error(f"Error in custom download: {e}") raise @staticmethod def show(show: str, season: int, specials: bool = False, episode_start: Optional[int] = None) -> None: """Download a season of a show from dropout.tv.""" try: season_str = f"{int(season):02}" if not specials else "00" season_type = 'Specials' if specials else f'Season {season}' directory = str(config.TV_DIR / show / season_type) os.makedirs(directory, exist_ok=True) with open(config.DROPOUT_JSON, 'r') as json_file: url_mapping = json.load(json_file, object_pairs_hook=dict) except (IOError, json.JSONDecodeError) as e: logger.error(f"Failed to read dropout JSON: {e}") raise url = next((item['URL'] for item in url_mapping if item['SHOW'] == show), None) if url is None: raise ValueError(f"Show '{show}' not found in the JSON data.") playlist_url = f'{url}/season:{season}' # Match filter logic filter_pattern = ( "title " f"{'~=' if specials else '!~='} " r"'(?i).*behind.?the.?scenes.*" r"|.*trailer.*" r"|.*recap.*" r"|.*last.looks.*'" ) match_filter = yt_dlp.utils.match_filter_func(filter_pattern) ydl_opts = { 'quiet': True, 'skip_download': True, 'cookiefile': str(config.DROPOUT_COOKIES), } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: playlist_info = ydl.extract_info(playlist_url, download=False) except Exception as e: logger.error(f"Failed to extract playlist info: {e}") raise entries = playlist_info.get('entries', []) filtered_entries = [entry for entry in entries if match_filter(entry) is None] episode_start_num = int(episode_start) if episode_start else 1 for i, entry in enumerate(filtered_entries, start=episode_start_num): episode_number = f"{i:02}" filename_template = f"{show} - S{season_str}E{episode_number} - %(title)s.%(ext)s" dl_opts = { 'progress_hooks': [my_hook], 'download_archive': str(config.DROPOUT_ARCHIVE), 'format': config.DEFAULT_FORMAT, 'audio_quality': '256K', 'paths': { 'temp': str(config.TEMP_DIR), 'home': directory }, 'cookiefile': str(config.DROPOUT_COOKIES), 'writesubtitles': True, 'subtitleslangs': ['en'], 'outtmpl': filename_template, } try: with yt_dlp.YoutubeDL(dl_opts) as ydl: ydl.download([entry['webpage_url']]) except Exception as e: logger.error(f"Error downloading episode {episode_number}: {e}") continue @staticmethod def series(force_download: bool = False) -> None: """Update the series list from dropout.tv.""" from bs4 import BeautifulSoup try: response = requests.get(f'{config.DROPOUT_BASE_URL}/series', timeout=10) response.raise_for_status() html = response.text soup = BeautifulSoup(html, 'html.parser') elements = soup.find_all('a', class_='browse-item-link') shows = [] for element in elements: show_data = {} show_data['href'] = element.get('href', '') img = element.find('img') if img: show_data['src'] = img.get('src', '') show_data['alt'] = img.get('alt', '') shows.append(show_data) # Load existing shows to merge existing_shows = {} try: with open(config.DROPOUT_JSON, 'r') as f: existing_data = json.load(f, object_pairs_hook=dict) for show in existing_data: existing_shows[show['SHOW']] = show except (FileNotFoundError, json.JSONDecodeError): existing_data = [] # Merge with new scraped shows json_data = [] scraped_titles = set() for show in shows: show_title = show.get('alt', 'No title') scraped_titles.add(show_title) info_data = {} info_data['SHOW'] = show_title info_data['URL'] = show.get('href', 'No link') info_data['LINK'] = re.sub(r".*dropout.tv/", "", show.get('href', '')) poster_path = grab.poster(show.get('src', ''), show.get('alt', ''), force_download) if poster_path: info_data['POSTER'] = poster_path json_data.append(info_data) # Add back any manually added shows that weren't scraped for show_title, show_data in existing_shows.items(): if show_title not in scraped_titles: json_data.append(show_data) os.makedirs(config.DATA_DIR, exist_ok=True) with open(config.DROPOUT_JSON, 'w') as json_file: json.dump(json_data, json_file, indent=4, separators=(',', ': ')) logger.info(f"Updated series list with {len(json_data)} shows (merged with existing)") except requests.RequestException as e: logger.error(f"Failed to fetch series list: {e}") raise except (IOError, json.JSONDecodeError) as e: logger.error(f"Failed to save series JSON: {e}") raise except Exception as e: logger.error(f"Unexpected error updating series: {e}") raise