conversion_project/core/audio_handler.py
2026-01-08 18:52:06 -05:00

441 lines
17 KiB
Python

# core/audio_handler.py
"""Audio stream detection, bitrate calculation, and codec selection."""
import json
import os
import subprocess
import tempfile
from pathlib import Path
from core.logger_helper import setup_logger
logger = setup_logger(Path(__file__).parent.parent / "logs")
def calculate_stream_bitrate(input_file: Path, stream_index: int) -> int:
"""
Extract audio stream to temporary file using -c copy, capture bitrate from ffmpeg output.
Returns bitrate in kbps. Falls back to 0 (and uses metadata) if extraction fails.
Uses ffmpeg's reported bitrate which is more accurate than calculating from file size/duration.
"""
# Ensure input file exists and is readable
input_file = Path(input_file)
if not input_file.exists():
logger.error(f"Input file does not exist: {input_file}")
return 0
if not os.access(input_file, os.R_OK):
logger.error(f"Input file is not readable (permission denied): {input_file}")
return 0
# Use project processing directory for temp files
processing_dir = Path(__file__).parent.parent / "processing"
processing_dir.mkdir(exist_ok=True)
# Determine the codec of this audio stream first
probe_cmd = [
"ffprobe", "-v", "error",
"-select_streams", f"a:{stream_index}",
"-show_entries", "stream=codec_name",
"-of", "default=noprint_wrappers=1:nokey=1",
str(input_file)
]
try:
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
codec_name = probe_result.stdout.strip().lower() if probe_result.stdout and probe_result.returncode == 0 else "aac"
except:
codec_name = "aac"
# Use MKA (Matroska Audio) which supports any codec
# This is a universal container that works with AC3, AAC, FLAC, DTS, Opus, etc.
temp_ext = ".mka"
temp_fd, temp_audio_path = tempfile.mkstemp(suffix=temp_ext, dir=str(processing_dir))
os.close(temp_fd)
try:
# Step 1: Extract audio stream with -c copy (lossless extraction)
# ffmpeg outputs bitrate info to stderr
extract_cmd = [
"ffmpeg", "-y", "-i", str(input_file),
"-map", f"0:a:{stream_index}",
"-c", "copy",
temp_audio_path
]
logger.debug(f"Extracting audio stream {stream_index} ({codec_name}) to temporary file for bitrate calculation...")
result = subprocess.run(extract_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
# Check if extraction succeeded
if result.returncode != 0:
logger.warning(f"Stream {stream_index}: ffmpeg extraction failed (return code {result.returncode})")
if result.stderr:
logger.debug(f"ffmpeg stderr: {result.stderr[:300]}")
return 0
# Step 2: Parse bitrate from ffmpeg's output (stderr)
# Look for line like: "bitrate= 457.7kbits/s"
bitrate_kbps = 0
stderr_lines = result.stderr if result.stderr else ""
for line in stderr_lines.split("\n"):
if "bitrate=" in line:
# Extract bitrate value from line like "size= 352162KiB time=01:45:03.05 bitrate= 457.7kbits/s"
parts = line.split("bitrate=")
if len(parts) > 1:
bitrate_str = parts[1].strip().split("kbits/s")[0].strip()
try:
bitrate_kbps = int(float(bitrate_str))
logger.debug(f"Stream {stream_index}: Extracted bitrate from ffmpeg output: {bitrate_kbps} kbps")
break
except ValueError:
continue
# If we couldn't parse bitrate from output, fall back to calculation
if bitrate_kbps == 0:
logger.debug(f"Stream {stream_index}: Could not parse bitrate from ffmpeg output, calculating from file size...")
file_size_bytes = os.path.getsize(temp_audio_path)
# Get duration using ffprobe
duration_cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1:noprint_wrappers=1",
temp_audio_path
]
duration_result = subprocess.run(duration_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
try:
duration_seconds = float(duration_result.stdout.strip()) if duration_result.stdout else 1.0
bitrate_kbps = int((file_size_bytes * 8) / duration_seconds / 1000)
logger.debug(f"Stream {stream_index}: Calculated bitrate from file: {bitrate_kbps} kbps")
except (ValueError, ZeroDivisionError):
logger.warning(f"Stream {stream_index}: Could not parse duration from ffprobe")
return 0
return bitrate_kbps
except Exception as e:
logger.warning(f"Failed to calculate bitrate for stream {stream_index}: {e}. Will fall back to metadata.")
return 0
finally:
# Clean up temporary audio file
try:
if os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
logger.debug(f"Deleted temporary audio file: {temp_audio_path}")
except Exception as e:
logger.warning(f"Could not delete temporary file {temp_audio_path}: {e}")
def get_audio_streams(input_file: Path):
"""
Detect audio streams and calculate robust bitrates by extracting each stream.
Returns list of (index, channels, calculated_bitrate_kbps, language, metadata_bitrate_kbps, title)
"""
import re
# First, get full ffprobe output to extract language codes and titles
probe_cmd = ["ffprobe", "-v", "info", str(input_file)]
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore')
# Parse language and title from output
language_map = {}
title_map = {}
stderr_output = probe_result.stderr if probe_result.stderr else ""
for line in stderr_output.split("\n"):
# Match "Stream #0:X(YYY)" where X is stream number, YYY is language
match = re.search(r"Stream #0:(\d+)\((\w{3})\)", line)
if match:
stream_idx = int(match.group(1))
lang_code = match.group(2)
language_map[stream_idx] = lang_code
# Get audio stream details via JSON with tags
cmd = [
"ffprobe","-v","error","-select_streams","a",
"-show_entries","stream=index,channels,bit_rate,tags",
"-of","json", str(input_file)
]
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore')
try:
data = json.loads(result.stdout) if result.stdout else {"streams": []}
except (json.JSONDecodeError, TypeError):
data = {"streams": []}
streams = []
for stream_num, s in enumerate(data.get("streams", [])):
index = s["index"]
channels = s.get("channels", 2)
# Get language from our parsed map, default to "und"
src_lang = language_map.get(index, "und")
# Get title from tags or from our parsed map
title = ""
if "tags" in s and "title" in s["tags"]:
title = s["tags"]["title"]
elif index in title_map:
title = title_map[index]
bit_rate_meta = int(s.get("bit_rate", 0)) if s.get("bit_rate") else 0
# Calculate robust bitrate by extracting the audio stream
calculated_bitrate_kbps = calculate_stream_bitrate(input_file, stream_num)
# If calculation failed, fall back to metadata
if calculated_bitrate_kbps == 0:
calculated_bitrate_kbps = int(bit_rate_meta / 1000) if bit_rate_meta else 160
logger.info(f"Stream {index}: Using fallback bitrate {calculated_bitrate_kbps} kbps")
streams.append((index, channels, calculated_bitrate_kbps, src_lang, int(bit_rate_meta / 1000) if bit_rate_meta else 0, title))
return streams
def choose_audio_bitrate(channels: int, bitrate_kbps: int, audio_config: dict, is_1080_class: bool) -> tuple:
"""
Choose audio codec and bitrate based on channel count, detected bitrate, and resolution.
Returns tuple: (codec, target_bitrate_bps)
- codec: "aac" (stereo), "eac3" (5.1), or "copy" (preserve original)
- target_bitrate_bps: target bitrate in bits/sec (0 if using "copy")
Rules:
Stereo + 1080p:
- Above 192k → encode to 192k with AAC
- At/below 192k → preserve (copy)
Stereo + 720p:
- Above 160k → encode to 160k with AAC
- At/below 160k → preserve (copy)
Multi-channel (5.1+):
- Below minimum threshold → preserve original (copy)
- Low to medium → use EAC3 codec
"""
# Normalize to 2ch or 6ch output
output_channels = 6 if channels >= 6 else 2
if output_channels == 2:
# Stereo logic - use AAC
if is_1080_class:
# 1080p+ stereo
high_br = audio_config["stereo"]["high"]
if bitrate_kbps > (high_br / 1000): # Above 192k
return ("aac", high_br)
else:
# Preserve original
logger.info(f"Stereo audio {bitrate_kbps}kbps ≤ {high_br/1000:.0f}k threshold - copying original")
return ("copy", 0)
else:
# 720p stereo
medium_br = audio_config["stereo"]["medium"]
if bitrate_kbps > (medium_br / 1000): # Above 160k
return ("aac", medium_br)
else:
# Preserve original
logger.info(f"Stereo audio {bitrate_kbps}kbps ≤ {medium_br/1000:.0f}k threshold - copying original")
return ("copy", 0)
else:
# Multi-channel (6ch+) logic - use EAC3
low_br = audio_config["multi_channel"]["low"]
medium_br = audio_config["multi_channel"]["medium"]
# If below the lowest threshold, copy the original audio instead of re-encoding
if bitrate_kbps < (low_br / 1000):
logger.info(f"Multi-channel audio {bitrate_kbps}kbps < {low_br/1000:.0f}k minimum - copying original to avoid artifical inflation")
return ("copy", 0)
elif bitrate_kbps < (medium_br / 1000):
# Below medium, use low with EAC3
return ("eac3", low_br)
else:
# Medium and above, use medium with EAC3
return ("eac3", medium_br)
def filter_audio_streams(input_file: Path, streams: list) -> list:
"""
Filter audio streams to keep only best English audio + Commentary tracks.
Args:
input_file: Path to video file
streams: List of (index, channels, bitrate, language, metadata, title) tuples
Returns:
Filtered list of streams (original indices preserved for FFmpeg mapping)
"""
if not streams:
return streams
# Try to get stream metadata (title) to detect commentary
english_tracks = []
commentary_tracks = []
for stream_info in streams:
index, channels, bitrate, language, metadata, title = stream_info
# Check if commentary (in title or metadata)
is_commentary = "comment" in str(title).lower() or "comment" in str(metadata).lower()
# Determine if English (check language field or assume first is English if no language set)
is_english = (language and "eng" in language.lower()) or (not language)
if is_commentary:
commentary_tracks.append((index, channels, bitrate, stream_info))
elif is_english:
english_tracks.append((index, channels, bitrate, stream_info))
# If no English tracks, return original
if not english_tracks:
logger.info("No English audio tracks detected - keeping all audio")
return streams
# Pick best English track (most channels, then highest bitrate)
english_tracks.sort(key=lambda x: (-x[1], -x[2])) # Sort by channels desc, then bitrate desc
best_english = english_tracks[0][3] # Get original stream tuple
logger.info(f"Audio filter: Keeping best English track (index {best_english[0]}: {best_english[1]}ch @ {best_english[2]}kbps)")
# Build result: best English + all commentary
filtered = [best_english] + [ct[3] for ct in commentary_tracks]
if commentary_tracks:
logger.info(f"Audio filter: Also keeping {len(commentary_tracks)} commentary track(s)")
# Log removed tracks
removed_count = len(streams) - len(filtered)
if removed_count > 0:
logger.info(f"Audio filter: Removed {removed_count} non-English audio track(s)")
return filtered
def prompt_user_audio_selection(streams: list) -> list:
"""
Interactively prompt user to select which audio streams to keep.
Args:
streams: List of (index, channels, bitrate, language, metadata, title) tuples
Returns:
Filtered list containing only selected streams
"""
if not streams or len(streams) <= 1:
return streams
print("\n" + "="*80)
print("🎵 AUDIO STREAM SELECTION")
print("="*80)
# Display all streams with details
for index, channels, bitrate, language, metadata, title in streams:
channels_display = f"{channels}ch"
lang_display = language if language != "und" else "undefined"
# Display title if available
if title:
title_display = f" | {title}"
else:
title_display = ""
print(f"\nStream #{index}: {channels_display} | Lang: {lang_display} | Bitrate: {bitrate}kbps{title_display}")
print("\n" + "-"*80)
print("Enter stream numbers to keep (comma-separated, e.g.: 1,2 or just 2)")
print("Leave blank to keep all streams")
print("-"*80)
user_input = input("➜ Keep streams: ").strip()
# If empty, keep all
if not user_input:
print("✅ Keeping all audio streams\n")
return streams
# Parse user input
try:
selected_indices = set()
for part in user_input.split(","):
idx = int(part.strip())
selected_indices.add(idx)
except ValueError:
print("❌ Invalid input. Keeping all streams.")
logger.warning("User provided invalid audio selection input")
return streams
# Filter streams to only selected ones
filtered = [s for s in streams if s[0] in selected_indices]
if not filtered:
print("❌ No valid streams selected. Keeping all streams.")
logger.warning("User selected no valid streams")
return streams
# Log what was selected/removed
removed_count = len(streams) - len(filtered)
print(f"✅ Keeping {len(filtered)} stream(s), removing {removed_count} stream(s)\n")
logger.info(f"User selected {len(filtered)} audio stream(s): {[s[0] for s in filtered]}")
if removed_count > 0:
removed_indices = [s[0] for s in streams if s[0] not in selected_indices]
logger.info(f"Removed {removed_count} audio stream(s): {removed_indices}")
# Return filtered streams without strip_title field - let prompt_for_title_stripping handle that
return filtered
def prompt_for_title_stripping(filtered_streams: list) -> list:
"""
Prompt user to select which streams should have titles stripped.
Args:
filtered_streams: List of (index, channels, bitrate, language, metadata, title, strip_title) tuples
Returns:
Same list with strip_title field updated based on user selection
"""
streams_with_titles = [(s[0], s[5]) for s in filtered_streams if s[5]]
if not streams_with_titles:
return [s + (False,) if len(s) == 6 else s for s in filtered_streams]
print("\n" + "="*80)
print("📝 TITLE METADATA STRIPPING (Optional)")
print("="*80)
print("\nStreams with titles that can be stripped:\n")
for idx, title in streams_with_titles:
print(f" Stream #{idx}: \"{title}\"")
print("\n" + "-"*80)
print("Enter stream numbers to STRIP titles (comma-separated, or leave blank to keep all)")
print("Example: \"1,3\" will strip titles from streams #1 and #3")
print("-"*80)
strip_input = input("➜ Strip titles from: ").strip()
strip_indices = set()
if strip_input:
try:
for part in strip_input.split(","):
idx = int(part.strip())
strip_indices.add(idx)
except ValueError:
print("❌ Invalid input. Keeping all titles.\n")
logger.warning("Invalid title stripping input")
# Add strip_title field to each stream
result = []
for s in filtered_streams:
should_strip = s[0] in strip_indices
result.append(s + (should_strip,))
if strip_indices:
print(f"✅ Will strip titles from stream(s): {sorted(list(strip_indices))}\n")
logger.info(f"User selected to strip titles from streams: {sorted(list(strip_indices))}")
else:
print("✅ Keeping all titles\n")
return result