270 lines
10 KiB
Python
270 lines
10 KiB
Python
"""
|
|
Rolling rename script - Updates episode release groups one at a time with delays.
|
|
Useful for staggering Sonarr/Radarr renames to avoid overwhelming the API or filesystem.
|
|
"""
|
|
|
|
import argparse
|
|
import time
|
|
import json
|
|
import requests
|
|
from pathlib import Path
|
|
from core.config_helper import load_config_xml
|
|
from core.sonarr_radarr_helper import SonarrRadarrHelper
|
|
from core.logger_helper import setup_logger
|
|
|
|
logger = setup_logger(Path("logs"))
|
|
|
|
|
|
def convert_server_path_to_local(server_path: str, path_mappings: list) -> str:
|
|
"""Convert server path (Linux) back to local path (Windows/Mac/Linux).
|
|
|
|
Useful when user provides a path from Sonarr/Radarr server instead of local path.
|
|
"""
|
|
if not path_mappings:
|
|
return server_path
|
|
|
|
# Normalize the input path
|
|
server_path = str(server_path).replace("\\", "/")
|
|
|
|
# Try to find reverse mapping (server path -> local path)
|
|
for mapping in path_mappings:
|
|
if isinstance(mapping, dict):
|
|
to_path = mapping.get("to", "").replace("\\", "/").rstrip("/")
|
|
from_path = mapping.get("from", "")
|
|
|
|
# Check if server_path starts with the "to" (server) path
|
|
if server_path.lower().startswith(to_path.lower()):
|
|
relative = server_path[len(to_path):].lstrip("/")
|
|
# Convert back to Windows path format
|
|
result = (from_path + "\\" + relative).replace("/", "\\") if relative else from_path
|
|
return result
|
|
|
|
return server_path
|
|
|
|
|
|
def rolling_rename_series(folder_path: str, wait_seconds: int = 20, season: int = None, sr_helper: SonarrRadarrHelper = None):
|
|
"""
|
|
Rename episodes in a series one at a time with delays.
|
|
|
|
Args:
|
|
folder_path: Path to series folder (e.g., P:\\tv\\Supernatural)
|
|
wait_seconds: Seconds to wait between renames (default: 120)
|
|
season: Optional season number to target (default: None for all seasons)
|
|
sr_helper: SonarrRadarrHelper instance (optional, created if not provided)
|
|
"""
|
|
if sr_helper is None:
|
|
config = load_config_xml(Path("config.xml"))
|
|
sonarr_config = config.get("services", {}).get("sonarr", {})
|
|
radarr_config = config.get("services", {}).get("radarr", {})
|
|
path_mappings = config.get("path_mappings", [])
|
|
|
|
sr_helper = SonarrRadarrHelper(
|
|
sonarr_url=sonarr_config.get("url"),
|
|
sonarr_api_key=sonarr_config.get("api_key"),
|
|
radarr_url=radarr_config.get("url"),
|
|
radarr_api_key=radarr_config.get("api_key"),
|
|
path_mappings=path_mappings
|
|
)
|
|
|
|
folder = Path(folder_path)
|
|
|
|
# If folder doesn't exist locally, try converting from server path
|
|
if not folder.is_dir():
|
|
config = load_config_xml(Path("config.xml"))
|
|
path_mappings = config.get("path_mappings", [])
|
|
converted_path = convert_server_path_to_local(folder_path, path_mappings)
|
|
folder = Path(converted_path)
|
|
|
|
if not folder.is_dir():
|
|
logger.error(f"Folder not found: {folder}")
|
|
return
|
|
|
|
# Load caches first
|
|
logger.info("Loading Sonarr/Radarr caches...")
|
|
sr_helper.load_sonarr_cache()
|
|
sr_helper.load_radarr_cache()
|
|
|
|
# Find series
|
|
logger.info(f"Finding series for: {folder}")
|
|
series_info = sr_helper.find_series_by_folder(str(folder))
|
|
|
|
if not series_info:
|
|
logger.error(f"Series not found in Sonarr/Radarr")
|
|
return
|
|
|
|
series_type = series_info.get("type", "sonarr")
|
|
series_id = series_info.get("id")
|
|
series_title = series_info.get("title")
|
|
episode_count = series_info.get("episode_count", 0)
|
|
|
|
logger.info(f"✓ Found {series_type.upper()} series: {series_title} (ID: {series_id}) - {episode_count} episodes")
|
|
logger.info(f" Path: {folder}")
|
|
logger.info(f"Will rename {episode_count} episodes with {wait_seconds} second(s) between each")
|
|
|
|
# Load temp cache with episodes
|
|
cache_file = Path("cache") / "temp_episodes.json"
|
|
if not cache_file.exists():
|
|
logger.error(f"Episode cache not found: {cache_file}")
|
|
return
|
|
|
|
try:
|
|
with open(cache_file, 'r') as f:
|
|
cache_data = json.load(f)
|
|
episodes = cache_data.get("episodes", [])
|
|
except Exception as e:
|
|
logger.error(f"Error reading episode cache: {e}")
|
|
return
|
|
|
|
if not episodes:
|
|
logger.warning("No episodes found in cache")
|
|
return
|
|
|
|
# Sort episodes by season and episode number
|
|
episodes.sort(key=lambda x: (x.get("seasonNumber", 0), x.get("episodeNumber", 0)))
|
|
|
|
# Filter: only episodes with actual files (hasFile: true) and skip season 0
|
|
episodes = [ep for ep in episodes if ep.get("hasFile") and ep.get("seasonNumber", 0) > 0]
|
|
|
|
# Filter by season if specified
|
|
if season:
|
|
episodes = [ep for ep in episodes if ep.get("seasonNumber") == season]
|
|
logger.info(f"Filtering to season {season}")
|
|
|
|
if not episodes:
|
|
logger.warning("No episodes with files found (excluding specials)")
|
|
return
|
|
|
|
logger.info(f"Starting rolling rename of {len(episodes)} episodes...\n")
|
|
|
|
# Create progress tracking file
|
|
progress_file = Path("logs") / "rolling_rename_progress.json"
|
|
progress_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Load existing progress
|
|
completed = {}
|
|
if progress_file.exists():
|
|
try:
|
|
with open(progress_file, 'r') as f:
|
|
progress_data = json.load(f)
|
|
completed = progress_data.get("completed", {})
|
|
logger.info(f"Found {len(completed)} previously completed episodes")
|
|
except Exception as e:
|
|
logger.warning(f"Could not load progress file: {e}")
|
|
|
|
# Filter out already completed episodes
|
|
remaining_episodes = [ep for ep in episodes if str(ep.get("id")) not in completed]
|
|
|
|
if not remaining_episodes:
|
|
logger.info("✓ All episodes have already been renamed!")
|
|
return
|
|
|
|
logger.info(f"Starting rolling rename of {len(remaining_episodes)} episodes...\n")
|
|
|
|
for idx, episode in enumerate(remaining_episodes, 1):
|
|
season = episode.get("seasonNumber")
|
|
ep_num = episode.get("episodeNumber")
|
|
title = episode.get("title", "Unknown")
|
|
episode_id = episode.get("id")
|
|
|
|
logger.info(f"[{idx}/{len(remaining_episodes)}] {series_title} - S{season:02d}E{ep_num:02d} - {title}")
|
|
|
|
try:
|
|
if series_type == "sonarr":
|
|
# Get episode file ID
|
|
headers = {"X-Api-Key": sr_helper.sonarr_api_key}
|
|
episode_url = f"{sr_helper.sonarr_url}/episode/{episode_id}"
|
|
response = requests.get(episode_url, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
ep_data = response.json()
|
|
|
|
# Trigger rename
|
|
if sr_helper.trigger_sonarr_rename(series_id, ep_data.get("episodeFileId")):
|
|
logger.info(f" ✓ Rename triggered for {series_title} S{season:02d}E{ep_num:02d}")
|
|
completed[str(episode_id)] = {
|
|
"season": season,
|
|
"episode": ep_num,
|
|
"title": title,
|
|
"timestamp": time.time()
|
|
}
|
|
else:
|
|
logger.warning(f" ✗ Failed to trigger rename")
|
|
else:
|
|
# Get movie file ID
|
|
headers = {"X-Api-Key": sr_helper.radarr_api_key}
|
|
movie_url = f"{sr_helper.radarr_url}/movie/{episode_id}"
|
|
response = requests.get(movie_url, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
movie_data = response.json()
|
|
|
|
# Trigger rename
|
|
movie_file_id = movie_data.get("movieFile", {}).get("id")
|
|
if movie_file_id and sr_helper.trigger_radarr_rename(movie_file_id):
|
|
logger.info(f" ✓ Rename triggered for {series_title}")
|
|
completed[str(episode_id)] = {
|
|
"season": season,
|
|
"episode": ep_num,
|
|
"title": title,
|
|
"timestamp": time.time()
|
|
}
|
|
else:
|
|
logger.warning(f" ✗ Failed to trigger rename")
|
|
except Exception as e:
|
|
logger.warning(f" ✗ Error triggering rename: {e}")
|
|
|
|
# Save progress after each update
|
|
try:
|
|
with open(progress_file, 'w') as f:
|
|
json.dump({
|
|
"series_id": series_id,
|
|
"series_title": series_title,
|
|
"total_episodes": len(episodes),
|
|
"completed_episodes": len(completed),
|
|
"completed": completed
|
|
}, f, indent=2)
|
|
except Exception as e:
|
|
logger.warning(f"Could not save progress: {e}")
|
|
|
|
# Wait before next update (except for last episode)
|
|
if idx < len(remaining_episodes):
|
|
logger.info(f" Waiting {wait_seconds} second(s) before next update...")
|
|
time.sleep(wait_seconds)
|
|
|
|
logger.info(f"\n✓ Rolling rename complete! {len(remaining_episodes)} episodes updated in {series_title}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Rolling rename script - Updates episode release groups with delays",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python main.py "P:\\tv\\Supernatural"
|
|
python main.py "P:\\tv\\Supernatural" --wait 300
|
|
python main.py "P:\\tv\\Breaking Bad" -w 60
|
|
python main.py "P:\\tv\\Supernatural" -s 5
|
|
python main.py "P:\\tv\\Supernatural" --season 10 --wait 180
|
|
"""
|
|
)
|
|
|
|
parser.add_argument("folder", nargs="?", help="Path to series folder")
|
|
parser.add_argument("-w", "--wait", type=int, default=20,
|
|
help="Seconds to wait between renames (default: 20)")
|
|
parser.add_argument("-s", "--season", type=int, default=None,
|
|
help="Target specific season number (default: all seasons)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# If no folder provided, ask for it
|
|
if not args.folder:
|
|
args.folder = input("Enter series folder path: ").strip()
|
|
|
|
if not args.folder:
|
|
logger.error("No folder path provided")
|
|
return
|
|
|
|
rolling_rename_series(args.folder, args.wait, args.season)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|