294 lines
11 KiB
Python
294 lines
11 KiB
Python
import os, yt_dlp, json, requests, re, logging
|
|
from bs4 import BeautifulSoup
|
|
from urllib.parse import urlsplit
|
|
|
|
logger = logging.getLogger("syllabus")
|
|
|
|
# Global or outer-scope tracking dictionary
|
|
last_logged_percent = {}
|
|
|
|
def my_hook(d): #logging hook
|
|
status = d.get('status')
|
|
filename = d.get('filename')
|
|
|
|
if status == 'downloading':
|
|
total_bytes = d.get('total_bytes') or d.get('total_bytes_estimate')
|
|
downloaded = d.get('downloaded_bytes', 0)
|
|
|
|
if total_bytes and filename:
|
|
percent = int(downloaded / total_bytes * 100)
|
|
current_value = last_logged_percent.get(filename, -10)
|
|
|
|
if percent >= current_value + 10:
|
|
last_logged_percent[filename] = (percent // 10) * 10
|
|
logger.info(
|
|
f"Downloading: {d.get('_percent_str')} at {d.get('_speed_str')} for {filename}"
|
|
)
|
|
|
|
elif status == 'finished':
|
|
logger.info(f"Download completed: {filename}")
|
|
# Optionally reset or clean up
|
|
last_logged_percent.pop(filename, None)
|
|
|
|
elif status == 'error':
|
|
logger.error(f"Error occurred: {d.get('error')}")
|
|
elif status == 'postprocessing':
|
|
logger.info(f"Post-processing: {filename}")
|
|
elif status == 'processing':
|
|
logger.info(f"Processing: {filename}")
|
|
|
|
|
|
|
|
# def ebook(url, author):
|
|
# destination = f"/ebooks/{author}"
|
|
# os.makedirs(destination, exist_ok=True) # Create the folder if it doesn't exist
|
|
|
|
# response = requests.get(url, stream=True)
|
|
# response.raise_for_status() # Good practice to raise error on bad status
|
|
|
|
# # Try to extract filename from the headers
|
|
# cd = response.headers.get('Content-Disposition')
|
|
# if cd and 'filename=' in cd:
|
|
# filename = cd.split('filename=')[1].strip('";')
|
|
# else:
|
|
# # Fallback: get the last part of the URL
|
|
# filename = os.path.basename(url)
|
|
|
|
# file_path = os.path.join(destination, filename)
|
|
|
|
# with open(file_path, 'wb') as f:
|
|
# for chunk in response.iter_content(chunk_size=8192):
|
|
# f.write(chunk)
|
|
|
|
class grab():
|
|
def season(url):
|
|
page_html=requests.get(url)
|
|
soup = BeautifulSoup(page_html.text, 'html.parser')
|
|
select_element = soup.find('select', class_='js-switch-season')
|
|
options = select_element.find_all('option')
|
|
option_values = [option['value'] for option in options if option.has_attr('value')]
|
|
seasons = [item.replace(url+'/season:', '') for item in option_values]
|
|
return seasons
|
|
|
|
def poster(url, name, force_download, save_dir='/data/posters/'):
|
|
# Use alt for filename if available, fallback to a generic name
|
|
alt_value = name
|
|
path = urlsplit(url).path
|
|
ext = os.path.splitext(path)[-1] or '.jpeg'
|
|
|
|
safe_name = re.sub(r'[^a-zA-Z0-9\s]', '', alt_value).replace(' ', '_')
|
|
filename = f"{safe_name}{ext}"
|
|
filepath = os.path.join(save_dir, filename)
|
|
|
|
if not os.path.exists(filepath) or force_download:
|
|
os.makedirs(save_dir, exist_ok=True)
|
|
img_data = requests.get(url).content
|
|
with open(filepath, 'wb') as handler:
|
|
handler.write(img_data)
|
|
|
|
return filepath
|
|
|
|
def thumbnail(ydl,url,location):
|
|
# Extracting video information
|
|
video_info = ydl.extract_info(url, download=False)
|
|
thumbnail_url = video_info.get('thumbnail')
|
|
|
|
# Download the thumbnail image
|
|
if thumbnail_url:
|
|
try:
|
|
thumbnail_filename = os.path.join(location, f"{video_info['id']}.jpg")
|
|
with open(thumbnail_filename, 'wb') as thumbnail_file:
|
|
thumbnail_file.write(requests.get(thumbnail_url).content)
|
|
print("Downloaded MP4 and downloaded thumbnail successfully!")
|
|
except Exception as e:
|
|
print(f"Error downloading thumbnail: {str(e)}")
|
|
else:
|
|
print("Downloaded MP4 but no thumbnail found.")
|
|
|
|
class ArchiveOnlyYDL(yt_dlp.YoutubeDL):
|
|
def process_info(self, info_dict):
|
|
# """Pretend the video was downloaded successfully, so archive is updated."""
|
|
self.record_download_archive(info_dict)
|
|
self.to_screen(f"Archived: {info_dict.get('title')}")
|
|
return info_dict
|
|
|
|
class dropout():
|
|
def archive(show, season):
|
|
with open('/data/dropout.json', 'r') as json_file:
|
|
url_mapping = json.load(json_file)
|
|
|
|
url = next((item['URL'] for item in url_mapping if item['SHOW'] == show), None)
|
|
if url is None:
|
|
raise ValueError(f"Show '{show}' not found in the JSON data.")
|
|
|
|
playlist_url = f'{url}/season:{season}'
|
|
|
|
dl_opts = {
|
|
'quiet': True,
|
|
'cookiefile': '/data/dropout.cookies.txt',
|
|
'download_archive': '/data/logs/dropout.archive.log',
|
|
'skip_download': True, # Prevent actual downloads
|
|
}
|
|
|
|
with ArchiveOnlyYDL(dl_opts) as ydl:
|
|
ydl.download([playlist_url])
|
|
|
|
def custom(url, directory, prefix):
|
|
filename_template = f"{prefix}%(title)s.%(ext)s" if prefix else "%(title)s.%(ext)s"
|
|
|
|
dl_opts = {
|
|
'progress_hooks': [my_hook],
|
|
'download_archive': '/data/logs/dropout.archive.log',
|
|
'format': 'bestvideo+bestaudio/best',
|
|
'audio_quality': '256K',
|
|
'paths': {
|
|
'temp': '/temp',
|
|
'home': directory,
|
|
},
|
|
'cookiefile': '/data/dropout.cookies.txt',
|
|
'writesubtitles': True,
|
|
'subtitleslangs': ['en'],
|
|
'outtmpl': filename_template,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(dl_opts) as ydl:
|
|
ydl.download([url] if isinstance(url, str) else url)
|
|
|
|
|
|
def show(show, season, specials=False, episode_start=None):
|
|
season_str = f"{int(season):02}" if not specials else "00"
|
|
directory = f"/tv/{show}/{'Specials' if specials else f'Season {season}'}"
|
|
os.makedirs(directory, exist_ok=True)
|
|
|
|
with open('/data/dropout.json', 'r') as json_file:
|
|
url_mapping = json.load(json_file)
|
|
|
|
url = next((item['URL'] for item in url_mapping if item['SHOW'] == show), None)
|
|
if url is None:
|
|
raise ValueError(f"Show '{show}' not found in the JSON data.")
|
|
|
|
playlist_url = f'{url}/season:{season}'
|
|
|
|
# Match filter logic
|
|
filter_pattern = (
|
|
"title "
|
|
f"{'~=' if specials else '!~='} "
|
|
r"'(?i).*behind.?the.?scenes.*"
|
|
r"|.*trailer.*"
|
|
r"|.*recap.*"
|
|
r"|.*last.looks.*'"
|
|
)
|
|
match_filter = yt_dlp.utils.match_filter_func(filter_pattern)
|
|
|
|
ydl_opts = {
|
|
'quiet': True,
|
|
'skip_download': True,
|
|
'cookiefile': '/data/dropout.cookies.txt',
|
|
}
|
|
|
|
# Extract playlist info
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
playlist_info = ydl.extract_info(playlist_url, download=False)
|
|
|
|
entries = playlist_info.get('entries', [])
|
|
filtered_entries = [entry for entry in entries if match_filter(entry) is None]
|
|
|
|
episode_start = int(episode_start) if episode_start else 1
|
|
|
|
for i, entry in enumerate(filtered_entries, start=episode_start):
|
|
episode_number = f"{i:02}"
|
|
filename_template = f"{show} - S{season_str}E{episode_number} - %(title)s.%(ext)s"
|
|
|
|
dl_opts = {
|
|
'progress_hooks': [my_hook],
|
|
'download_archive': '/data/logs/dropout.archive.log',
|
|
'format': 'bestvideo+bestaudio/best',
|
|
'audio_quality': '256K',
|
|
'paths': {
|
|
'temp': '/temp',
|
|
'home': directory
|
|
},
|
|
'cookiefile': '/data/dropout.cookies.txt',
|
|
'writesubtitles': True,
|
|
'subtitleslangs': ['en'],
|
|
'outtmpl': filename_template,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(dl_opts) as ydl:
|
|
ydl.download([entry['webpage_url']])
|
|
|
|
def series(force_download):
|
|
json_data=[]
|
|
html=requests.get('https://watch.dropout.tv/series').text
|
|
|
|
# If you want to parse the HTML
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
elements = soup.find_all('a', class_='browse-item-link')
|
|
|
|
shows = []
|
|
for element in elements:
|
|
show_data = {}
|
|
show_data['href'] = element.get('href', '')
|
|
|
|
img = element.find('img')
|
|
if img:
|
|
show_data['src'] = img.get('src', '')
|
|
show_data['alt'] = img.get('alt', '')
|
|
|
|
shows.append(show_data)
|
|
|
|
# Now 'shows' is a list of dicts, so this works:
|
|
for show in shows:
|
|
info_data = {}
|
|
info_data['SHOW'] = show.get('alt', 'No title')
|
|
info_data['URL'] = show.get('href', 'No link')
|
|
info_data['LINK'] = re.sub(r".*dropout.tv/", "", show.get('href', ''))
|
|
info_data['POSTER'] = grab.poster(show.get('src', ''), show.get('alt', ''),force_download)
|
|
json_data.append(info_data)
|
|
|
|
# Sort the json_data by the 'SHOW' key
|
|
# sorted_json_data = sorted(json_data, key=lambda x: x['SHOW'])
|
|
with open('/data/dropout.json', 'w') as json_file:
|
|
json.dump(json_data, json_file, indent=4)
|
|
|
|
class youtube():
|
|
def ydl(url, location):
|
|
logger.info(f'message=Received download request for {url}.')
|
|
dl_ops = {
|
|
'progress_hooks': [my_hook],
|
|
'download_archive': '/data/logs/youtube.archive.log',
|
|
'paths': {
|
|
'temp': '/temp',
|
|
'home': location
|
|
},
|
|
'outtmpl': '%(uploader)s/%(title)s.%(ext)s'
|
|
}
|
|
if dl_ops['paths']['home'] == '/podcasts':
|
|
dl_ops['format'] = 'bestaudio/best[ext=mp3]'
|
|
dl_ops['postprocessors'] = [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '192',
|
|
}, {
|
|
'key': 'FFmpegMetadata',
|
|
'add_metadata': True,
|
|
}]
|
|
elif dl_ops['paths']['home'] == '/asmr':
|
|
dl_ops['format'] = 'bestaudio/best[ext=mp3]'
|
|
dl_ops['postprocessors'] = [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '192',
|
|
}, {
|
|
'key': 'FFmpegMetadata',
|
|
'add_metadata': True,
|
|
}]
|
|
elif dl_ops['paths']['home'] == '/youtube':
|
|
dl_ops['format'] = 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best'
|
|
dl_ops['cookiefile'] = '/data/youtube.cookies.txt'
|
|
else:
|
|
dl_ops['format'] = 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best'
|
|
|
|
with yt_dlp.YoutubeDL(dl_ops) as ydl:
|
|
ydl.download([url])
|
|
# grab.thumbnail(ydl,url,location) |