441 lines
17 KiB
Python
441 lines
17 KiB
Python
# core/audio_handler.py
|
|
"""Audio stream detection, bitrate calculation, and codec selection."""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
from core.logger_helper import setup_logger
|
|
|
|
logger = setup_logger(Path(__file__).parent.parent / "logs")
|
|
|
|
|
|
def calculate_stream_bitrate(input_file: Path, stream_index: int) -> int:
|
|
"""
|
|
Extract audio stream to temporary file using -c copy, capture bitrate from ffmpeg output.
|
|
Returns bitrate in kbps. Falls back to 0 (and uses metadata) if extraction fails.
|
|
|
|
Uses ffmpeg's reported bitrate which is more accurate than calculating from file size/duration.
|
|
"""
|
|
# Ensure input file exists and is readable
|
|
input_file = Path(input_file)
|
|
if not input_file.exists():
|
|
logger.error(f"Input file does not exist: {input_file}")
|
|
return 0
|
|
|
|
if not os.access(input_file, os.R_OK):
|
|
logger.error(f"Input file is not readable (permission denied): {input_file}")
|
|
return 0
|
|
|
|
# Use project processing directory for temp files
|
|
processing_dir = Path(__file__).parent.parent / "processing"
|
|
processing_dir.mkdir(exist_ok=True)
|
|
|
|
# Determine the codec of this audio stream first
|
|
probe_cmd = [
|
|
"ffprobe", "-v", "error",
|
|
"-select_streams", f"a:{stream_index}",
|
|
"-show_entries", "stream=codec_name",
|
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
|
str(input_file)
|
|
]
|
|
try:
|
|
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
|
|
codec_name = probe_result.stdout.strip().lower() if probe_result.stdout and probe_result.returncode == 0 else "aac"
|
|
except:
|
|
codec_name = "aac"
|
|
|
|
# Use MKA (Matroska Audio) which supports any codec
|
|
# This is a universal container that works with AC3, AAC, FLAC, DTS, Opus, etc.
|
|
temp_ext = ".mka"
|
|
|
|
temp_fd, temp_audio_path = tempfile.mkstemp(suffix=temp_ext, dir=str(processing_dir))
|
|
os.close(temp_fd)
|
|
|
|
try:
|
|
# Step 1: Extract audio stream with -c copy (lossless extraction)
|
|
# ffmpeg outputs bitrate info to stderr
|
|
extract_cmd = [
|
|
"ffmpeg", "-y", "-i", str(input_file),
|
|
"-map", f"0:a:{stream_index}",
|
|
"-c", "copy",
|
|
temp_audio_path
|
|
]
|
|
logger.debug(f"Extracting audio stream {stream_index} ({codec_name}) to temporary file for bitrate calculation...")
|
|
result = subprocess.run(extract_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
|
|
|
|
# Check if extraction succeeded
|
|
if result.returncode != 0:
|
|
logger.warning(f"Stream {stream_index}: ffmpeg extraction failed (return code {result.returncode})")
|
|
if result.stderr:
|
|
logger.debug(f"ffmpeg stderr: {result.stderr[:300]}")
|
|
return 0
|
|
|
|
# Step 2: Parse bitrate from ffmpeg's output (stderr)
|
|
# Look for line like: "bitrate= 457.7kbits/s"
|
|
bitrate_kbps = 0
|
|
stderr_lines = result.stderr if result.stderr else ""
|
|
for line in stderr_lines.split("\n"):
|
|
if "bitrate=" in line:
|
|
# Extract bitrate value from line like "size= 352162KiB time=01:45:03.05 bitrate= 457.7kbits/s"
|
|
parts = line.split("bitrate=")
|
|
if len(parts) > 1:
|
|
bitrate_str = parts[1].strip().split("kbits/s")[0].strip()
|
|
try:
|
|
bitrate_kbps = int(float(bitrate_str))
|
|
logger.debug(f"Stream {stream_index}: Extracted bitrate from ffmpeg output: {bitrate_kbps} kbps")
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
# If we couldn't parse bitrate from output, fall back to calculation
|
|
if bitrate_kbps == 0:
|
|
logger.debug(f"Stream {stream_index}: Could not parse bitrate from ffmpeg output, calculating from file size...")
|
|
file_size_bytes = os.path.getsize(temp_audio_path)
|
|
|
|
# Get duration using ffprobe
|
|
duration_cmd = [
|
|
"ffprobe", "-v", "error",
|
|
"-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1:noprint_wrappers=1",
|
|
temp_audio_path
|
|
]
|
|
duration_result = subprocess.run(duration_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore', check=False)
|
|
try:
|
|
duration_seconds = float(duration_result.stdout.strip()) if duration_result.stdout else 1.0
|
|
bitrate_kbps = int((file_size_bytes * 8) / duration_seconds / 1000)
|
|
logger.debug(f"Stream {stream_index}: Calculated bitrate from file: {bitrate_kbps} kbps")
|
|
except (ValueError, ZeroDivisionError):
|
|
logger.warning(f"Stream {stream_index}: Could not parse duration from ffprobe")
|
|
return 0
|
|
|
|
return bitrate_kbps
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to calculate bitrate for stream {stream_index}: {e}. Will fall back to metadata.")
|
|
return 0
|
|
|
|
finally:
|
|
# Clean up temporary audio file
|
|
try:
|
|
if os.path.exists(temp_audio_path):
|
|
os.remove(temp_audio_path)
|
|
logger.debug(f"Deleted temporary audio file: {temp_audio_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not delete temporary file {temp_audio_path}: {e}")
|
|
|
|
|
|
def get_audio_streams(input_file: Path):
|
|
"""
|
|
Detect audio streams and calculate robust bitrates by extracting each stream.
|
|
Returns list of (index, channels, calculated_bitrate_kbps, language, metadata_bitrate_kbps, title)
|
|
"""
|
|
import re
|
|
|
|
# First, get full ffprobe output to extract language codes and titles
|
|
probe_cmd = ["ffprobe", "-v", "info", str(input_file)]
|
|
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore')
|
|
|
|
# Parse language and title from output
|
|
language_map = {}
|
|
title_map = {}
|
|
|
|
stderr_output = probe_result.stderr if probe_result.stderr else ""
|
|
for line in stderr_output.split("\n"):
|
|
# Match "Stream #0:X(YYY)" where X is stream number, YYY is language
|
|
match = re.search(r"Stream #0:(\d+)\((\w{3})\)", line)
|
|
if match:
|
|
stream_idx = int(match.group(1))
|
|
lang_code = match.group(2)
|
|
language_map[stream_idx] = lang_code
|
|
|
|
# Get audio stream details via JSON with tags
|
|
cmd = [
|
|
"ffprobe","-v","error","-select_streams","a",
|
|
"-show_entries","stream=index,channels,bit_rate,tags",
|
|
"-of","json", str(input_file)
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore')
|
|
try:
|
|
data = json.loads(result.stdout) if result.stdout else {"streams": []}
|
|
except (json.JSONDecodeError, TypeError):
|
|
data = {"streams": []}
|
|
|
|
streams = []
|
|
|
|
for stream_num, s in enumerate(data.get("streams", [])):
|
|
index = s["index"]
|
|
channels = s.get("channels", 2)
|
|
|
|
# Get language from our parsed map, default to "und"
|
|
src_lang = language_map.get(index, "und")
|
|
|
|
# Get title from tags or from our parsed map
|
|
title = ""
|
|
if "tags" in s and "title" in s["tags"]:
|
|
title = s["tags"]["title"]
|
|
elif index in title_map:
|
|
title = title_map[index]
|
|
|
|
bit_rate_meta = int(s.get("bit_rate", 0)) if s.get("bit_rate") else 0
|
|
|
|
# Calculate robust bitrate by extracting the audio stream
|
|
calculated_bitrate_kbps = calculate_stream_bitrate(input_file, stream_num)
|
|
|
|
# If calculation failed, fall back to metadata
|
|
if calculated_bitrate_kbps == 0:
|
|
calculated_bitrate_kbps = int(bit_rate_meta / 1000) if bit_rate_meta else 160
|
|
logger.info(f"Stream {index}: Using fallback bitrate {calculated_bitrate_kbps} kbps")
|
|
|
|
streams.append((index, channels, calculated_bitrate_kbps, src_lang, int(bit_rate_meta / 1000) if bit_rate_meta else 0, title))
|
|
|
|
return streams
|
|
|
|
|
|
def choose_audio_bitrate(channels: int, bitrate_kbps: int, audio_config: dict, is_1080_class: bool) -> tuple:
|
|
"""
|
|
Choose audio codec and bitrate based on channel count, detected bitrate, and resolution.
|
|
|
|
Returns tuple: (codec, target_bitrate_bps)
|
|
- codec: "aac" (stereo), "eac3" (5.1), or "copy" (preserve original)
|
|
- target_bitrate_bps: target bitrate in bits/sec (0 if using "copy")
|
|
|
|
Rules:
|
|
Stereo + 1080p:
|
|
- Above 192k → encode to 192k with AAC
|
|
- At/below 192k → preserve (copy)
|
|
|
|
Stereo + 720p:
|
|
- Above 160k → encode to 160k with AAC
|
|
- At/below 160k → preserve (copy)
|
|
|
|
Multi-channel (5.1+):
|
|
- Below minimum threshold → preserve original (copy)
|
|
- Low to medium → use EAC3 codec
|
|
"""
|
|
# Normalize to 2ch or 6ch output
|
|
output_channels = 6 if channels >= 6 else 2
|
|
|
|
if output_channels == 2:
|
|
# Stereo logic - use AAC
|
|
if is_1080_class:
|
|
# 1080p+ stereo
|
|
high_br = audio_config["stereo"]["high"]
|
|
if bitrate_kbps > (high_br / 1000): # Above 192k
|
|
return ("aac", high_br)
|
|
else:
|
|
# Preserve original
|
|
logger.info(f"Stereo audio {bitrate_kbps}kbps ≤ {high_br/1000:.0f}k threshold - copying original")
|
|
return ("copy", 0)
|
|
else:
|
|
# 720p stereo
|
|
medium_br = audio_config["stereo"]["medium"]
|
|
if bitrate_kbps > (medium_br / 1000): # Above 160k
|
|
return ("aac", medium_br)
|
|
else:
|
|
# Preserve original
|
|
logger.info(f"Stereo audio {bitrate_kbps}kbps ≤ {medium_br/1000:.0f}k threshold - copying original")
|
|
return ("copy", 0)
|
|
|
|
else:
|
|
# Multi-channel (6ch+) logic - use EAC3
|
|
low_br = audio_config["multi_channel"]["low"]
|
|
medium_br = audio_config["multi_channel"]["medium"]
|
|
|
|
# If below the lowest threshold, copy the original audio instead of re-encoding
|
|
if bitrate_kbps < (low_br / 1000):
|
|
logger.info(f"Multi-channel audio {bitrate_kbps}kbps < {low_br/1000:.0f}k minimum - copying original to avoid artifical inflation")
|
|
return ("copy", 0)
|
|
elif bitrate_kbps < (medium_br / 1000):
|
|
# Below medium, use low with EAC3
|
|
return ("eac3", low_br)
|
|
else:
|
|
# Medium and above, use medium with EAC3
|
|
return ("eac3", medium_br)
|
|
|
|
def filter_audio_streams(input_file: Path, streams: list) -> list:
|
|
"""
|
|
Filter audio streams to keep only best English audio + Commentary tracks.
|
|
|
|
Args:
|
|
input_file: Path to video file
|
|
streams: List of (index, channels, bitrate, language, metadata, title) tuples
|
|
|
|
Returns:
|
|
Filtered list of streams (original indices preserved for FFmpeg mapping)
|
|
"""
|
|
if not streams:
|
|
return streams
|
|
|
|
# Try to get stream metadata (title) to detect commentary
|
|
english_tracks = []
|
|
commentary_tracks = []
|
|
|
|
for stream_info in streams:
|
|
index, channels, bitrate, language, metadata, title = stream_info
|
|
|
|
# Check if commentary (in title or metadata)
|
|
is_commentary = "comment" in str(title).lower() or "comment" in str(metadata).lower()
|
|
|
|
# Determine if English (check language field or assume first is English if no language set)
|
|
is_english = (language and "eng" in language.lower()) or (not language)
|
|
|
|
if is_commentary:
|
|
commentary_tracks.append((index, channels, bitrate, stream_info))
|
|
elif is_english:
|
|
english_tracks.append((index, channels, bitrate, stream_info))
|
|
|
|
# If no English tracks, return original
|
|
if not english_tracks:
|
|
logger.info("No English audio tracks detected - keeping all audio")
|
|
return streams
|
|
|
|
# Pick best English track (most channels, then highest bitrate)
|
|
english_tracks.sort(key=lambda x: (-x[1], -x[2])) # Sort by channels desc, then bitrate desc
|
|
best_english = english_tracks[0][3] # Get original stream tuple
|
|
|
|
logger.info(f"Audio filter: Keeping best English track (index {best_english[0]}: {best_english[1]}ch @ {best_english[2]}kbps)")
|
|
|
|
# Build result: best English + all commentary
|
|
filtered = [best_english] + [ct[3] for ct in commentary_tracks]
|
|
|
|
if commentary_tracks:
|
|
logger.info(f"Audio filter: Also keeping {len(commentary_tracks)} commentary track(s)")
|
|
|
|
# Log removed tracks
|
|
removed_count = len(streams) - len(filtered)
|
|
if removed_count > 0:
|
|
logger.info(f"Audio filter: Removed {removed_count} non-English audio track(s)")
|
|
|
|
return filtered
|
|
|
|
|
|
def prompt_user_audio_selection(streams: list) -> list:
|
|
"""
|
|
Interactively prompt user to select which audio streams to keep.
|
|
|
|
Args:
|
|
streams: List of (index, channels, bitrate, language, metadata, title) tuples
|
|
|
|
Returns:
|
|
Filtered list containing only selected streams
|
|
"""
|
|
if not streams or len(streams) <= 1:
|
|
return streams
|
|
|
|
print("\n" + "="*80)
|
|
print("🎵 AUDIO STREAM SELECTION")
|
|
print("="*80)
|
|
|
|
# Display all streams with details
|
|
for index, channels, bitrate, language, metadata, title in streams:
|
|
channels_display = f"{channels}ch"
|
|
lang_display = language if language != "und" else "undefined"
|
|
|
|
# Display title if available
|
|
if title:
|
|
title_display = f" | {title}"
|
|
else:
|
|
title_display = ""
|
|
|
|
print(f"\nStream #{index}: {channels_display} | Lang: {lang_display} | Bitrate: {bitrate}kbps{title_display}")
|
|
|
|
print("\n" + "-"*80)
|
|
print("Enter stream numbers to keep (comma-separated, e.g.: 1,2 or just 2)")
|
|
print("Leave blank to keep all streams")
|
|
print("-"*80)
|
|
|
|
user_input = input("➜ Keep streams: ").strip()
|
|
|
|
# If empty, keep all
|
|
if not user_input:
|
|
print("✅ Keeping all audio streams\n")
|
|
return streams
|
|
|
|
# Parse user input
|
|
try:
|
|
selected_indices = set()
|
|
for part in user_input.split(","):
|
|
idx = int(part.strip())
|
|
selected_indices.add(idx)
|
|
except ValueError:
|
|
print("❌ Invalid input. Keeping all streams.")
|
|
logger.warning("User provided invalid audio selection input")
|
|
return streams
|
|
|
|
# Filter streams to only selected ones
|
|
filtered = [s for s in streams if s[0] in selected_indices]
|
|
|
|
if not filtered:
|
|
print("❌ No valid streams selected. Keeping all streams.")
|
|
logger.warning("User selected no valid streams")
|
|
return streams
|
|
|
|
# Log what was selected/removed
|
|
removed_count = len(streams) - len(filtered)
|
|
print(f"✅ Keeping {len(filtered)} stream(s), removing {removed_count} stream(s)\n")
|
|
logger.info(f"User selected {len(filtered)} audio stream(s): {[s[0] for s in filtered]}")
|
|
|
|
if removed_count > 0:
|
|
removed_indices = [s[0] for s in streams if s[0] not in selected_indices]
|
|
logger.info(f"Removed {removed_count} audio stream(s): {removed_indices}")
|
|
|
|
# Return filtered streams without strip_title field - let prompt_for_title_stripping handle that
|
|
return filtered
|
|
|
|
|
|
def prompt_for_title_stripping(filtered_streams: list) -> list:
|
|
"""
|
|
Prompt user to select which streams should have titles stripped.
|
|
|
|
Args:
|
|
filtered_streams: List of (index, channels, bitrate, language, metadata, title, strip_title) tuples
|
|
|
|
Returns:
|
|
Same list with strip_title field updated based on user selection
|
|
"""
|
|
streams_with_titles = [(s[0], s[5]) for s in filtered_streams if s[5]]
|
|
|
|
if not streams_with_titles:
|
|
return [s + (False,) if len(s) == 6 else s for s in filtered_streams]
|
|
|
|
print("\n" + "="*80)
|
|
print("📝 TITLE METADATA STRIPPING (Optional)")
|
|
print("="*80)
|
|
print("\nStreams with titles that can be stripped:\n")
|
|
|
|
for idx, title in streams_with_titles:
|
|
print(f" Stream #{idx}: \"{title}\"")
|
|
|
|
print("\n" + "-"*80)
|
|
print("Enter stream numbers to STRIP titles (comma-separated, or leave blank to keep all)")
|
|
print("Example: \"1,3\" will strip titles from streams #1 and #3")
|
|
print("-"*80)
|
|
|
|
strip_input = input("➜ Strip titles from: ").strip()
|
|
|
|
strip_indices = set()
|
|
if strip_input:
|
|
try:
|
|
for part in strip_input.split(","):
|
|
idx = int(part.strip())
|
|
strip_indices.add(idx)
|
|
except ValueError:
|
|
print("❌ Invalid input. Keeping all titles.\n")
|
|
logger.warning("Invalid title stripping input")
|
|
|
|
# Add strip_title field to each stream
|
|
result = []
|
|
for s in filtered_streams:
|
|
should_strip = s[0] in strip_indices
|
|
result.append(s + (should_strip,))
|
|
|
|
if strip_indices:
|
|
print(f"✅ Will strip titles from stream(s): {sorted(list(strip_indices))}\n")
|
|
logger.info(f"User selected to strip titles from streams: {sorted(list(strip_indices))}")
|
|
else:
|
|
print("✅ Keeping all titles\n")
|
|
|
|
return result |