140 lines
5.4 KiB
Python
140 lines
5.4 KiB
Python
"""Base classes and utilities for downloading."""
|
|
|
|
import os
|
|
import yt_dlp
|
|
import requests
|
|
import logging
|
|
from typing import Dict, Any, Optional
|
|
import config
|
|
|
|
logger = logging.getLogger("syllabus")
|
|
|
|
# Global or outer-scope tracking dictionary
|
|
last_logged_percent = {}
|
|
|
|
|
|
def my_hook(d: Dict[str, Any]) -> None:
|
|
"""Logging hook for yt_dlp download progress."""
|
|
status = d.get('status')
|
|
filename = d.get('filename')
|
|
|
|
if status == 'downloading':
|
|
total_bytes = d.get('total_bytes') or d.get('total_bytes_estimate')
|
|
downloaded = d.get('downloaded_bytes', 0)
|
|
|
|
if total_bytes and filename:
|
|
percent = int(downloaded / total_bytes * 100)
|
|
current_value = last_logged_percent.get(filename, -10)
|
|
|
|
if percent >= current_value + 10:
|
|
last_logged_percent[filename] = (percent // 10) * 10
|
|
logger.info(
|
|
f"Downloading: {d.get('_percent_str')} at {d.get('_speed_str')} for {filename}"
|
|
)
|
|
|
|
elif status == 'finished':
|
|
logger.info(f"Download completed: {filename}")
|
|
last_logged_percent.pop(filename, None)
|
|
|
|
elif status == 'error':
|
|
logger.error(f"Error occurred: {d.get('error')}")
|
|
elif status == 'postprocessing':
|
|
logger.info(f"Post-processing: {filename}")
|
|
elif status == 'processing':
|
|
logger.info(f"Processing: {filename}")
|
|
|
|
|
|
class ArchiveOnlyYDL(yt_dlp.YoutubeDL):
|
|
"""Custom YoutubeDL class that only updates archive without downloading."""
|
|
def process_info(self, info_dict: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Record download in archive without actually downloading."""
|
|
self.record_download_archive(info_dict)
|
|
self.to_screen(f"Archived: {info_dict.get('title')}")
|
|
return info_dict
|
|
|
|
|
|
class grab:
|
|
"""Utilities for grabbing media metadata and thumbnails."""
|
|
|
|
@staticmethod
|
|
def season(url: str) -> list:
|
|
"""Extract available seasons from a show URL."""
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
page_html = requests.get(url, timeout=10)
|
|
page_html.raise_for_status()
|
|
soup = BeautifulSoup(page_html.text, 'html.parser')
|
|
select_element = soup.find('select', class_='js-switch-season')
|
|
if not select_element:
|
|
logger.warning(f"Season select element not found for URL: {url}")
|
|
return []
|
|
options = select_element.find_all('option')
|
|
option_values = [option['value'] for option in options if option.has_attr('value')]
|
|
seasons = [item.replace(url + '/season:', '') for item in option_values]
|
|
return seasons
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to fetch seasons from {url}: {e}")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error parsing seasons: {e}")
|
|
return []
|
|
|
|
@staticmethod
|
|
def poster(url: str, name: str, force_download: bool, save_dir: Optional[str] = None) -> str:
|
|
"""Download and save a poster image."""
|
|
from urllib.parse import urlsplit
|
|
import re
|
|
|
|
if save_dir is None:
|
|
save_dir = str(config.POSTERS_DIR)
|
|
try:
|
|
alt_value = name
|
|
path = urlsplit(url).path
|
|
ext = os.path.splitext(path)[-1] or '.jpeg'
|
|
|
|
safe_name = re.sub(r'[^a-zA-Z0-9\s]', '', alt_value).replace(' ', '_')
|
|
filename = f"{safe_name}{ext}"
|
|
filepath = os.path.join(save_dir, filename)
|
|
|
|
if not os.path.exists(filepath) or force_download:
|
|
os.makedirs(save_dir, exist_ok=True)
|
|
response = requests.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
with open(filepath, 'wb') as handler:
|
|
handler.write(response.content)
|
|
logger.debug(f"Downloaded poster to {filepath}")
|
|
|
|
# Return relative web path instead of absolute filesystem path
|
|
return f"/data/posters/{filename}"
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to download poster from {url}: {e}")
|
|
return ""
|
|
except IOError as e:
|
|
logger.error(f"Failed to save poster to {filepath}: {e}")
|
|
return ""
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error downloading poster: {e}")
|
|
return ""
|
|
|
|
@staticmethod
|
|
def thumbnail(ydl: Any, url: str, location: str) -> None:
|
|
"""Download and save a video thumbnail."""
|
|
try:
|
|
video_info = ydl.extract_info(url, download=False)
|
|
thumbnail_url = video_info.get('thumbnail')
|
|
|
|
if thumbnail_url:
|
|
try:
|
|
thumbnail_filename = os.path.join(location, f"{video_info['id']}.jpg")
|
|
response = requests.get(thumbnail_url, timeout=10)
|
|
response.raise_for_status()
|
|
with open(thumbnail_filename, 'wb') as thumbnail_file:
|
|
thumbnail_file.write(response.content)
|
|
logger.info("Downloaded MP4 and thumbnail successfully")
|
|
except (requests.RequestException, IOError) as e:
|
|
logger.error(f"Error downloading thumbnail: {e}")
|
|
else:
|
|
logger.info("Downloaded MP4 but no thumbnail found")
|
|
except Exception as e:
|
|
logger.error(f"Error extracting video info for thumbnail: {e}")
|