# core/audio_handler.py """Audio stream detection, bitrate calculation, and codec selection.""" import json import os import subprocess import tempfile from pathlib import Path from core.logger_helper import setup_logger logger = setup_logger(Path(__file__).parent.parent / "logs") def calculate_stream_bitrate(input_file: Path, stream_index: int) -> int: """ Extract audio stream to temporary file using -c copy, capture bitrate from ffmpeg output. Returns bitrate in kbps. Falls back to 0 (and uses metadata) if extraction fails. Uses ffmpeg's reported bitrate which is more accurate than calculating from file size/duration. """ # Ensure input file exists and is readable input_file = Path(input_file) if not input_file.exists(): logger.error(f"Input file does not exist: {input_file}") return 0 if not os.access(input_file, os.R_OK): logger.error(f"Input file is not readable (permission denied): {input_file}") return 0 # Use project processing directory for temp files processing_dir = Path(__file__).parent.parent / "processing" processing_dir.mkdir(exist_ok=True) # Determine the codec of this audio stream first probe_cmd = [ "ffprobe", "-v", "error", "-select_streams", f"a:{stream_index}", "-show_entries", "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1", str(input_file) ] try: probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False) codec_name = probe_result.stdout.strip().lower() if probe_result.stdout and probe_result.returncode == 0 else "aac" except: codec_name = "aac" # Use MKA (Matroska Audio) which supports any codec # This is a universal container that works with AC3, AAC, FLAC, DTS, Opus, etc. temp_ext = ".mka" temp_fd, temp_audio_path = tempfile.mkstemp(suffix=temp_ext, dir=str(processing_dir)) os.close(temp_fd) try: # Step 1: Extract audio stream with -c copy (lossless extraction) # ffmpeg outputs bitrate info to stderr extract_cmd = [ "ffmpeg", "-y", "-i", str(input_file), "-map", f"0:a:{stream_index}", "-c", "copy", temp_audio_path ] logger.debug(f"Extracting audio stream {stream_index} ({codec_name}) to temporary file for bitrate calculation...") result = subprocess.run(extract_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False) # Check if extraction succeeded if result.returncode != 0: logger.warning(f"Stream {stream_index}: ffmpeg extraction failed (return code {result.returncode})") if result.stderr: logger.debug(f"ffmpeg stderr: {result.stderr[:300]}") return 0 # Step 2: Parse bitrate from ffmpeg's output (stderr) # Look for line like: "bitrate= 457.7kbits/s" bitrate_kbps = 0 stderr_lines = result.stderr if result.stderr else "" for line in stderr_lines.split("\n"): if "bitrate=" in line: # Extract bitrate value from line like "size= 352162KiB time=01:45:03.05 bitrate= 457.7kbits/s" parts = line.split("bitrate=") if len(parts) > 1: bitrate_str = parts[1].strip().split("kbits/s")[0].strip() try: bitrate_kbps = int(float(bitrate_str)) logger.debug(f"Stream {stream_index}: Extracted bitrate from ffmpeg output: {bitrate_kbps} kbps") break except ValueError: continue # If we couldn't parse bitrate from output, fall back to calculation if bitrate_kbps == 0: logger.debug(f"Stream {stream_index}: Could not parse bitrate from ffmpeg output, calculating from file size...") file_size_bytes = os.path.getsize(temp_audio_path) # Get duration using ffprobe duration_cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1:noprint_wrappers=1", temp_audio_path ] duration_result = subprocess.run(duration_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False) try: duration_seconds = float(duration_result.stdout.strip()) if duration_result.stdout else 1.0 bitrate_kbps = int((file_size_bytes * 8) / duration_seconds / 1000) logger.debug(f"Stream {stream_index}: Calculated bitrate from file: {bitrate_kbps} kbps") except (ValueError, ZeroDivisionError): logger.warning(f"Stream {stream_index}: Could not parse duration from ffprobe") return 0 return bitrate_kbps except Exception as e: logger.warning(f"Failed to calculate bitrate for stream {stream_index}: {e}. Will fall back to metadata.") return 0 finally: # Clean up temporary audio file try: if os.path.exists(temp_audio_path): os.remove(temp_audio_path) logger.debug(f"Deleted temporary audio file: {temp_audio_path}") except Exception as e: logger.warning(f"Could not delete temporary file {temp_audio_path}: {e}") def get_audio_streams(input_file: Path): """ Detect audio streams and calculate robust bitrates by extracting each stream. Returns list of (index, channels, calculated_bitrate_kbps, language, metadata_bitrate_kbps, title) """ import re # First, get full ffprobe output to extract language codes and titles probe_cmd = ["ffprobe", "-v", "info", str(input_file)] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore') # Parse language and title from output language_map = {} title_map = {} stderr_output = probe_result.stderr if probe_result.stderr else "" for line in stderr_output.split("\n"): # Match "Stream #0:X(YYY)" where X is stream number, YYY is language match = re.search(r"Stream #0:(\d+)\((\w{3})\)", line) if match: stream_idx = int(match.group(1)) lang_code = match.group(2) language_map[stream_idx] = lang_code # Get audio stream details via JSON with tags cmd = [ "ffprobe","-v","error","-select_streams","a", "-show_entries","stream=index,channels,bit_rate,tags", "-of","json", str(input_file) ] result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore') try: data = json.loads(result.stdout) if result.stdout else {"streams": []} except (json.JSONDecodeError, TypeError): data = {"streams": []} streams = [] for stream_num, s in enumerate(data.get("streams", [])): index = s["index"] channels = s.get("channels", 2) # Get language from our parsed map, default to "und" src_lang = language_map.get(index, "und") # Get title from tags or from our parsed map title = "" if "tags" in s and "title" in s["tags"]: title = s["tags"]["title"] elif index in title_map: title = title_map[index] bit_rate_meta = int(s.get("bit_rate", 0)) if s.get("bit_rate") else 0 # Calculate robust bitrate by extracting the audio stream calculated_bitrate_kbps = calculate_stream_bitrate(input_file, stream_num) # If calculation failed, fall back to metadata if calculated_bitrate_kbps == 0: calculated_bitrate_kbps = int(bit_rate_meta / 1000) if bit_rate_meta else 160 logger.info(f"Stream {index}: Using fallback bitrate {calculated_bitrate_kbps} kbps") streams.append((index, channels, calculated_bitrate_kbps, src_lang, int(bit_rate_meta / 1000) if bit_rate_meta else 0, title)) return streams def choose_audio_bitrate(channels: int, bitrate_kbps: int, audio_config: dict, is_1080_class: bool) -> tuple: """ Choose audio codec and bitrate based on channel count, detected bitrate, and resolution. Returns tuple: (codec, target_bitrate_bps) - codec: "aac" (stereo), "eac3" (5.1), or "copy" (preserve original) - target_bitrate_bps: target bitrate in bits/sec (0 if using "copy") Rules: Stereo + 1080p: - Above 192k → encode to 192k with AAC - At/below 192k → preserve (copy) Stereo + 720p: - Above 160k → encode to 160k with AAC - At/below 160k → preserve (copy) Multi-channel (5.1+): - Below minimum threshold → preserve original (copy) - Low to medium → use EAC3 codec """ # Normalize to 2ch or 6ch output output_channels = 6 if channels >= 6 else 2 if output_channels == 2: # Stereo logic - use AAC if is_1080_class: # 1080p+ stereo high_br = audio_config["stereo"]["high"] if bitrate_kbps > (high_br / 1000): # Above 192k return ("aac", high_br) else: # Preserve original logger.info(f"Stereo audio {bitrate_kbps}kbps ≤ {high_br/1000:.0f}k threshold - copying original") return ("copy", 0) else: # 720p stereo medium_br = audio_config["stereo"]["medium"] if bitrate_kbps > (medium_br / 1000): # Above 160k return ("aac", medium_br) else: # Preserve original logger.info(f"Stereo audio {bitrate_kbps}kbps ≤ {medium_br/1000:.0f}k threshold - copying original") return ("copy", 0) else: # Multi-channel (6ch+) logic - use EAC3 low_br = audio_config["multi_channel"]["low"] medium_br = audio_config["multi_channel"]["medium"] # If below the lowest threshold, copy the original audio instead of re-encoding if bitrate_kbps < (low_br / 1000): logger.info(f"Multi-channel audio {bitrate_kbps}kbps < {low_br/1000:.0f}k minimum - copying original to avoid artifical inflation") return ("copy", 0) elif bitrate_kbps < (medium_br / 1000): # Below medium, use low with EAC3 return ("eac3", low_br) else: # Medium and above, use medium with EAC3 return ("eac3", medium_br) def filter_audio_streams(input_file: Path, streams: list) -> list: """ Filter audio streams to keep only best English audio + Commentary tracks. Args: input_file: Path to video file streams: List of (index, channels, bitrate, language, metadata, title) tuples Returns: Filtered list of streams (original indices preserved for FFmpeg mapping) """ if not streams: return streams # Try to get stream metadata (title) to detect commentary english_tracks = [] commentary_tracks = [] for stream_info in streams: index, channels, bitrate, language, metadata, title = stream_info # Check if commentary (in title or metadata) is_commentary = "comment" in str(title).lower() or "comment" in str(metadata).lower() # Determine if English (check language field or assume first is English if no language set) is_english = (language and "eng" in language.lower()) or (not language) if is_commentary: commentary_tracks.append((index, channels, bitrate, stream_info)) elif is_english: english_tracks.append((index, channels, bitrate, stream_info)) # If no English tracks, return original if not english_tracks: logger.info("No English audio tracks detected - keeping all audio") return streams # Pick best English track (most channels, then highest bitrate) english_tracks.sort(key=lambda x: (-x[1], -x[2])) # Sort by channels desc, then bitrate desc best_english = english_tracks[0][3] # Get original stream tuple logger.info(f"Audio filter: Keeping best English track (index {best_english[0]}: {best_english[1]}ch @ {best_english[2]}kbps)") # Build result: best English + all commentary filtered = [best_english] + [ct[3] for ct in commentary_tracks] if commentary_tracks: logger.info(f"Audio filter: Also keeping {len(commentary_tracks)} commentary track(s)") # Log removed tracks removed_count = len(streams) - len(filtered) if removed_count > 0: logger.info(f"Audio filter: Removed {removed_count} non-English audio track(s)") return filtered def prompt_user_audio_selection(streams: list) -> list: """ Interactively prompt user to select which audio streams to keep. Args: streams: List of (index, channels, bitrate, language, metadata, title) tuples Returns: Filtered list containing only selected streams """ if not streams or len(streams) <= 1: return streams print("\n" + "="*80) print("🎵 AUDIO STREAM SELECTION") print("="*80) # Display all streams with details for index, channels, bitrate, language, metadata, title in streams: channels_display = f"{channels}ch" lang_display = language if language != "und" else "undefined" # Display title if available if title: title_display = f" | {title}" else: title_display = "" print(f"\nStream #{index}: {channels_display} | Lang: {lang_display} | Bitrate: {bitrate}kbps{title_display}") print("\n" + "-"*80) print("Enter stream numbers to keep (comma-separated, e.g.: 1,2 or just 2)") print("Leave blank to keep all streams") print("-"*80) user_input = input("➜ Keep streams: ").strip() # If empty, keep all if not user_input: print("✅ Keeping all audio streams\n") return streams # Parse user input try: selected_indices = set() for part in user_input.split(","): idx = int(part.strip()) selected_indices.add(idx) except ValueError: print("❌ Invalid input. Keeping all streams.") logger.warning("User provided invalid audio selection input") return streams # Filter streams to only selected ones filtered = [s for s in streams if s[0] in selected_indices] if not filtered: print("❌ No valid streams selected. Keeping all streams.") logger.warning("User selected no valid streams") return streams # Log what was selected/removed removed_count = len(streams) - len(filtered) print(f"✅ Keeping {len(filtered)} stream(s), removing {removed_count} stream(s)\n") logger.info(f"User selected {len(filtered)} audio stream(s): {[s[0] for s in filtered]}") if removed_count > 0: removed_indices = [s[0] for s in streams if s[0] not in selected_indices] logger.info(f"Removed {removed_count} audio stream(s): {removed_indices}") # Return filtered streams without strip_title field - let prompt_for_title_stripping handle that return filtered def prompt_for_title_stripping(filtered_streams: list) -> list: """ Prompt user to select which streams should have titles stripped. Args: filtered_streams: List of (index, channels, bitrate, language, metadata, title, strip_title) tuples Returns: Same list with strip_title field updated based on user selection """ streams_with_titles = [(s[0], s[5]) for s in filtered_streams if s[5]] if not streams_with_titles: return [s + (False,) if len(s) == 6 else s for s in filtered_streams] print("\n" + "="*80) print("📝 TITLE METADATA STRIPPING (Optional)") print("="*80) print("\nStreams with titles that can be stripped:\n") for idx, title in streams_with_titles: print(f" Stream #{idx}: \"{title}\"") print("\n" + "-"*80) print("Enter stream numbers to STRIP titles (comma-separated, or leave blank to keep all)") print("Example: \"1,3\" will strip titles from streams #1 and #3") print("-"*80) strip_input = input("➜ Strip titles from: ").strip() strip_indices = set() if strip_input: try: for part in strip_input.split(","): idx = int(part.strip()) strip_indices.add(idx) except ValueError: print("❌ Invalid input. Keeping all titles.\n") logger.warning("Invalid title stripping input") # Add strip_title field to each stream result = [] for s in filtered_streams: should_strip = s[0] in strip_indices result.append(s + (should_strip,)) if strip_indices: print(f"✅ Will strip titles from stream(s): {sorted(list(strip_indices))}\n") logger.info(f"User selected to strip titles from streams: {sorted(list(strip_indices))}") else: print("✅ Keeping all titles\n") return result