""" Integration with Sonarr/Radarr for rolling rename functionality. Updates episode/movie release groups via API. """ import requests import json from pathlib import Path from typing import Optional, Dict from core.logger_helper import setup_logger logger = setup_logger(Path(__file__).parent.parent / "logs") class SonarrRadarrHelper: def __init__(self, sonarr_url: str = None, sonarr_api_key: str = None, radarr_url: str = None, radarr_api_key: str = None, path_mappings: list = None): """Initialize Sonarr/Radarr API clients. Args: sonarr_url: Base URL like http://10.0.0.10:8989 (without /api/v3) radarr_url: Base URL like http://10.0.0.10:7878 (without /api/v3) path_mappings: List of dicts with 'from' (Windows) and 'to' (Linux) keys """ self.sonarr_url = f"{sonarr_url}/api/v3".rstrip('/') if sonarr_url else None self.sonarr_api_key = sonarr_api_key self.radarr_url = f"{radarr_url}/api/v3".rstrip('/') if radarr_url else None self.radarr_api_key = radarr_api_key self.path_mappings = path_mappings or [] # Cache for series and movies self.sonarr_cache = None self.radarr_cache = None self.cache_file_sonarr = Path(__file__).parent.parent / "cache" / "sonarr_cache.json" self.cache_file_radarr = Path(__file__).parent.parent / "cache" / "radarr_cache.json" def _convert_to_linux_path(self, windows_path: str) -> str: """Convert Windows path to Linux path using path_mappings.""" windows_path = str(windows_path).replace("\\", "/") # Ensure path_mappings is a list if not self.path_mappings: logger.debug(f"No path mappings configured, returning path as-is: {windows_path}") return windows_path # Try to find matching mapping for mapping in self.path_mappings: # Safely extract from and to values if isinstance(mapping, dict): from_path = str(mapping.get("from", "")).replace("\\", "/").lower() to_path = mapping.get("to", "") else: # Skip invalid mapping entries logger.warning(f"Invalid path mapping (not a dict): {mapping}") continue if not from_path or not to_path: continue if windows_path.lower().startswith(from_path): # Replace the Windows portion with Linux portion relative_path = windows_path[len(from_path):] linux_path = to_path.rstrip("/") + "/" + relative_path.lstrip("/") logger.debug(f"Path conversion: {windows_path} → {linux_path}") return linux_path # No mapping found, return as-is (already converted to /) logger.debug(f"No path mapping found for: {windows_path}") return windows_path def load_sonarr_cache(self) -> bool: """Load and cache all Sonarr series data. Returns: True if cache loaded successfully, False otherwise """ if not self.sonarr_url or not self.sonarr_api_key: logger.warning("Sonarr API not configured") return False try: print("📡 Fetching Sonarr series cache...") headers = {"X-Api-Key": self.sonarr_api_key} series_url = f"{self.sonarr_url}/series" response = requests.get(series_url, headers=headers, timeout=10) response.raise_for_status() series_list = response.json() # Store series data cache_data = [] for series in series_list: cache_data.append({ "type": "sonarr", "id": series.get("id"), "title": series.get("title", "Unknown"), "path": series.get("path", ""), }) self.sonarr_cache = cache_data # Save to file self.cache_file_sonarr.parent.mkdir(parents=True, exist_ok=True) with open(self.cache_file_sonarr, 'w') as f: json.dump(cache_data, f, indent=2) print(f"✓ Sonarr cache loaded: {len(cache_data)} series") logger.info(f"Sonarr cache loaded: {len(cache_data)} series") return True except Exception as e: logger.warning(f"Error loading Sonarr cache: {e}") return False def load_radarr_cache(self) -> bool: """Load and cache all Radarr movies data. Returns: True if cache loaded successfully, False otherwise """ if not self.radarr_url or not self.radarr_api_key: logger.warning("Radarr API not configured") return False try: print("📡 Fetching Radarr movies cache...") headers = {"X-Api-Key": self.radarr_api_key} movie_url = f"{self.radarr_url}/movie" response = requests.get(movie_url, headers=headers, timeout=10) response.raise_for_status() movies = response.json() cache_data = [] for movie in movies: if "movieFile" in movie and movie["movieFile"]: movie_file_path = movie["movieFile"].get("path", "") if movie_file_path: movie_file_path = str(Path(movie_file_path).resolve()).replace("\\", "/") cache_data.append({ "type": "radarr", "movie_id": movie.get("id"), "title": movie.get("title", "Unknown"), "year": movie.get("year"), "file_path": movie_file_path, "quality_profile": movie.get("qualityProfileId"), }) self.radarr_cache = cache_data # Save to file self.cache_file_radarr.parent.mkdir(parents=True, exist_ok=True) with open(self.cache_file_radarr, 'w') as f: json.dump(cache_data, f, indent=2) print(f"✓ Radarr cache loaded: {len(cache_data)} movies") logger.info(f"Radarr cache loaded: {len(cache_data)} movies") return True except Exception as e: logger.warning(f"Error loading Radarr cache: {e}") return False def find_series_by_folder(self, folder_path: str) -> Optional[Dict]: """Find series in cache by folder path and fetch episodes. Args: folder_path: Windows folder path (e.g., P:\\tv\\Supernatural or P:\\tv\\Supernatural\\Season 13) Returns: Dict with series info and episode count if found, None otherwise """ # Convert Windows folder path to Linux path windows_path = str(folder_path) linux_path = self._convert_to_linux_path(windows_path) # Just normalize separators, don't resolve linux_path = linux_path.replace("\\", "/").rstrip("/") logger.info(f"Input folder: {windows_path}") logger.info(f"Converted to: {linux_path}") # Remove Season subfolder if present (e.g., /path/Supernatural/Season 13 -> /path/Supernatural) # This handles cases where a season subfolder is passed instead of the series root path_parts = linux_path.split("/") if path_parts and path_parts[-1].lower().startswith("season"): linux_path = "/".join(path_parts[:-1]) logger.info(f"Stripped season folder, searching for: {linux_path}") # Search Sonarr cache for matching series path if self.sonarr_cache: for series in self.sonarr_cache: series_path = series.get("path", "").rstrip("/") if linux_path.lower() == series_path.lower(): series_id = series.get("id") series_title = series.get("title") logger.info(f"✓ Found series: {series_title} (ID: {series_id})") # Fetch episodes from API episodes = [] try: if self.sonarr_url and self.sonarr_api_key: headers = {"X-Api-Key": self.sonarr_api_key} episode_url = f"{self.sonarr_url}/episode?seriesId={series_id}" ep_response = requests.get(episode_url, headers=headers, timeout=10) ep_response.raise_for_status() episodes = ep_response.json() # For each episode with a file, fetch the file details to get the path for episode in episodes: if episode.get("hasFile") and episode.get("episodeFileId"): try: file_id = episode.get("episodeFileId") file_url = f"{self.sonarr_url}/episodefile/{file_id}" file_response = requests.get(file_url, headers=headers, timeout=10) file_response.raise_for_status() file_data = file_response.json() # Add file path to episode episode["episodeFile"] = file_data except Exception as e: logger.debug(f"Error fetching episode file {file_id}: {e}") logger.info(f"Fetched {len(episodes)} episodes for {series_title}") print(f"📡 Fetched {len(episodes)} episodes") # Save to temp cache temp_cache = { "series_id": series_id, "series_title": series_title, "total_episodes": len(episodes), "episodes": episodes } cache_file = Path(__file__).parent.parent / "cache" / "temp_episodes.json" cache_file.parent.mkdir(parents=True, exist_ok=True) with open(cache_file, 'w') as f: json.dump(temp_cache, f, indent=2) logger.info(f"Saved episodes to {cache_file}") except Exception as e: logger.error(f"Error fetching episodes: {e}") return { "type": "sonarr", "id": series_id, "title": series_title, "path": series_path, "episode_count": len(episodes), } # Search Radarr cache if self.radarr_cache: for item in self.radarr_cache: item_path = item.get("file_path", "").rstrip("/") # For movies, check if folder matches the parent directory if linux_path.lower() == item_path.lower() or linux_path.lower() in item_path.lower(): logger.info(f"✓ Found movie: {item['title']} ({item['year']})") return item logger.info(f"No series found for: {linux_path}") return None def trigger_sonarr_rename(self, series_id: int, episode_file_id: int) -> bool: """Trigger Sonarr to rename an episode file. Args: series_id: Sonarr series ID episode_file_id: Episode file ID to rename Returns: True if successful, False otherwise """ if not self.sonarr_url or not self.sonarr_api_key: logger.warning("Sonarr not configured") return False try: headers = {"X-Api-Key": self.sonarr_api_key} cmd_url = f"{self.sonarr_url}/command" cmd_data = { "name": "RenameFiles", "seriesId": series_id, "files": [episode_file_id] } response = requests.post(cmd_url, headers=headers, json=cmd_data, timeout=10) response.raise_for_status() return True except Exception as e: logger.error(f"Error triggering Sonarr rename: {e}") return False def trigger_radarr_rename(self, movie_file_id: int) -> bool: """Trigger Radarr to rename a movie file. Args: movie_file_id: Movie file ID to rename Returns: True if successful, False otherwise """ if not self.radarr_url or not self.radarr_api_key: logger.warning("Radarr not configured") return False try: headers = {"X-Api-Key": self.radarr_api_key} cmd_url = f"{self.radarr_url}/command" cmd_data = { "name": "RenameMovie", "movieFileIds": [movie_file_id] } response = requests.post(cmd_url, headers=headers, json=cmd_data, timeout=10) response.raise_for_status() return True except Exception as e: logger.error(f"Error triggering Radarr rename: {e}") return False