renamarr/core/sonarr_radarr_helper.py
2026-01-01 12:59:14 -05:00

321 lines
14 KiB
Python

"""
Integration with Sonarr/Radarr for rolling rename functionality.
Updates episode/movie release groups via API.
"""
import requests
import json
from pathlib import Path
from typing import Optional, Dict
from core.logger_helper import setup_logger
logger = setup_logger(Path(__file__).parent.parent / "logs")
class SonarrRadarrHelper:
def __init__(self, sonarr_url: str = None, sonarr_api_key: str = None,
radarr_url: str = None, radarr_api_key: str = None,
path_mappings: list = None):
"""Initialize Sonarr/Radarr API clients.
Args:
sonarr_url: Base URL like http://10.0.0.10:8989 (without /api/v3)
radarr_url: Base URL like http://10.0.0.10:7878 (without /api/v3)
path_mappings: List of dicts with 'from' (Windows) and 'to' (Linux) keys
"""
self.sonarr_url = f"{sonarr_url}/api/v3".rstrip('/') if sonarr_url else None
self.sonarr_api_key = sonarr_api_key
self.radarr_url = f"{radarr_url}/api/v3".rstrip('/') if radarr_url else None
self.radarr_api_key = radarr_api_key
self.path_mappings = path_mappings or []
# Cache for series and movies
self.sonarr_cache = None
self.radarr_cache = None
self.cache_file_sonarr = Path(__file__).parent.parent / "cache" / "sonarr_cache.json"
self.cache_file_radarr = Path(__file__).parent.parent / "cache" / "radarr_cache.json"
def _convert_to_linux_path(self, windows_path: str) -> str:
"""Convert Windows path to Linux path using path_mappings."""
windows_path = str(windows_path).replace("\\", "/")
# Ensure path_mappings is a list
if not self.path_mappings:
logger.debug(f"No path mappings configured, returning path as-is: {windows_path}")
return windows_path
# Try to find matching mapping
for mapping in self.path_mappings:
# Safely extract from and to values
if isinstance(mapping, dict):
from_path = str(mapping.get("from", "")).replace("\\", "/").lower()
to_path = mapping.get("to", "")
else:
# Skip invalid mapping entries
logger.warning(f"Invalid path mapping (not a dict): {mapping}")
continue
if not from_path or not to_path:
continue
if windows_path.lower().startswith(from_path):
# Replace the Windows portion with Linux portion
relative_path = windows_path[len(from_path):]
linux_path = to_path.rstrip("/") + "/" + relative_path.lstrip("/")
logger.debug(f"Path conversion: {windows_path}{linux_path}")
return linux_path
# No mapping found, return as-is (already converted to /)
logger.debug(f"No path mapping found for: {windows_path}")
return windows_path
def load_sonarr_cache(self) -> bool:
"""Load and cache all Sonarr series data.
Returns:
True if cache loaded successfully, False otherwise
"""
if not self.sonarr_url or not self.sonarr_api_key:
logger.warning("Sonarr API not configured")
return False
try:
print("📡 Fetching Sonarr series cache...")
headers = {"X-Api-Key": self.sonarr_api_key}
series_url = f"{self.sonarr_url}/series"
response = requests.get(series_url, headers=headers, timeout=10)
response.raise_for_status()
series_list = response.json()
# Store series data
cache_data = []
for series in series_list:
cache_data.append({
"type": "sonarr",
"id": series.get("id"),
"title": series.get("title", "Unknown"),
"path": series.get("path", ""),
})
self.sonarr_cache = cache_data
# Save to file
self.cache_file_sonarr.parent.mkdir(parents=True, exist_ok=True)
with open(self.cache_file_sonarr, 'w') as f:
json.dump(cache_data, f, indent=2)
print(f"✓ Sonarr cache loaded: {len(cache_data)} series")
logger.info(f"Sonarr cache loaded: {len(cache_data)} series")
return True
except Exception as e:
logger.warning(f"Error loading Sonarr cache: {e}")
return False
def load_radarr_cache(self) -> bool:
"""Load and cache all Radarr movies data.
Returns:
True if cache loaded successfully, False otherwise
"""
if not self.radarr_url or not self.radarr_api_key:
logger.warning("Radarr API not configured")
return False
try:
print("📡 Fetching Radarr movies cache...")
headers = {"X-Api-Key": self.radarr_api_key}
movie_url = f"{self.radarr_url}/movie"
response = requests.get(movie_url, headers=headers, timeout=10)
response.raise_for_status()
movies = response.json()
cache_data = []
for movie in movies:
if "movieFile" in movie and movie["movieFile"]:
movie_file_path = movie["movieFile"].get("path", "")
if movie_file_path:
movie_file_path = str(Path(movie_file_path).resolve()).replace("\\", "/")
cache_data.append({
"type": "radarr",
"movie_id": movie.get("id"),
"title": movie.get("title", "Unknown"),
"year": movie.get("year"),
"file_path": movie_file_path,
"quality_profile": movie.get("qualityProfileId"),
})
self.radarr_cache = cache_data
# Save to file
self.cache_file_radarr.parent.mkdir(parents=True, exist_ok=True)
with open(self.cache_file_radarr, 'w') as f:
json.dump(cache_data, f, indent=2)
print(f"✓ Radarr cache loaded: {len(cache_data)} movies")
logger.info(f"Radarr cache loaded: {len(cache_data)} movies")
return True
except Exception as e:
logger.warning(f"Error loading Radarr cache: {e}")
return False
def find_series_by_folder(self, folder_path: str) -> Optional[Dict]:
"""Find series in cache by folder path and fetch episodes.
Args:
folder_path: Windows folder path (e.g., P:\\tv\\Supernatural or P:\\tv\\Supernatural\\Season 13)
Returns:
Dict with series info and episode count if found, None otherwise
"""
# Convert Windows folder path to Linux path
windows_path = str(folder_path)
linux_path = self._convert_to_linux_path(windows_path)
# Just normalize separators, don't resolve
linux_path = linux_path.replace("\\", "/").rstrip("/")
logger.info(f"Input folder: {windows_path}")
logger.info(f"Converted to: {linux_path}")
# Remove Season subfolder if present (e.g., /path/Supernatural/Season 13 -> /path/Supernatural)
# This handles cases where a season subfolder is passed instead of the series root
path_parts = linux_path.split("/")
if path_parts and path_parts[-1].lower().startswith("season"):
linux_path = "/".join(path_parts[:-1])
logger.info(f"Stripped season folder, searching for: {linux_path}")
# Search Sonarr cache for matching series path
if self.sonarr_cache:
for series in self.sonarr_cache:
series_path = series.get("path", "").rstrip("/")
if linux_path.lower() == series_path.lower():
series_id = series.get("id")
series_title = series.get("title")
logger.info(f"✓ Found series: {series_title} (ID: {series_id})")
# Fetch episodes from API
episodes = []
try:
if self.sonarr_url and self.sonarr_api_key:
headers = {"X-Api-Key": self.sonarr_api_key}
episode_url = f"{self.sonarr_url}/episode?seriesId={series_id}"
ep_response = requests.get(episode_url, headers=headers, timeout=10)
ep_response.raise_for_status()
episodes = ep_response.json()
# For each episode with a file, fetch the file details to get the path
for episode in episodes:
if episode.get("hasFile") and episode.get("episodeFileId"):
try:
file_id = episode.get("episodeFileId")
file_url = f"{self.sonarr_url}/episodefile/{file_id}"
file_response = requests.get(file_url, headers=headers, timeout=10)
file_response.raise_for_status()
file_data = file_response.json()
# Add file path to episode
episode["episodeFile"] = file_data
except Exception as e:
logger.debug(f"Error fetching episode file {file_id}: {e}")
logger.info(f"Fetched {len(episodes)} episodes for {series_title}")
print(f"📡 Fetched {len(episodes)} episodes")
# Save to temp cache
temp_cache = {
"series_id": series_id,
"series_title": series_title,
"total_episodes": len(episodes),
"episodes": episodes
}
cache_file = Path(__file__).parent.parent / "cache" / "temp_episodes.json"
cache_file.parent.mkdir(parents=True, exist_ok=True)
with open(cache_file, 'w') as f:
json.dump(temp_cache, f, indent=2)
logger.info(f"Saved episodes to {cache_file}")
except Exception as e:
logger.error(f"Error fetching episodes: {e}")
return {
"type": "sonarr",
"id": series_id,
"title": series_title,
"path": series_path,
"episode_count": len(episodes),
}
# Search Radarr cache
if self.radarr_cache:
for item in self.radarr_cache:
item_path = item.get("file_path", "").rstrip("/")
# For movies, check if folder matches the parent directory
if linux_path.lower() == item_path.lower() or linux_path.lower() in item_path.lower():
logger.info(f"✓ Found movie: {item['title']} ({item['year']})")
return item
logger.info(f"No series found for: {linux_path}")
return None
def trigger_sonarr_rename(self, series_id: int, episode_file_id: int) -> bool:
"""Trigger Sonarr to rename an episode file.
Args:
series_id: Sonarr series ID
episode_file_id: Episode file ID to rename
Returns:
True if successful, False otherwise
"""
if not self.sonarr_url or not self.sonarr_api_key:
logger.warning("Sonarr not configured")
return False
try:
headers = {"X-Api-Key": self.sonarr_api_key}
cmd_url = f"{self.sonarr_url}/command"
cmd_data = {
"name": "RenameFiles",
"seriesId": series_id,
"files": [episode_file_id]
}
response = requests.post(cmd_url, headers=headers, json=cmd_data, timeout=10)
response.raise_for_status()
return True
except Exception as e:
logger.error(f"Error triggering Sonarr rename: {e}")
return False
def trigger_radarr_rename(self, movie_file_id: int) -> bool:
"""Trigger Radarr to rename a movie file.
Args:
movie_file_id: Movie file ID to rename
Returns:
True if successful, False otherwise
"""
if not self.radarr_url or not self.radarr_api_key:
logger.warning("Radarr not configured")
return False
try:
headers = {"X-Api-Key": self.radarr_api_key}
cmd_url = f"{self.radarr_url}/command"
cmd_data = {
"name": "RenameMovie",
"movieFileIds": [movie_file_id]
}
response = requests.post(cmd_url, headers=headers, json=cmd_data, timeout=10)
response.raise_for_status()
return True
except Exception as e:
logger.error(f"Error triggering Radarr rename: {e}")
return False