# core/audio_handler.py """Audio stream detection, bitrate calculation, and codec selection.""" import json import os import subprocess import tempfile from pathlib import Path from core.logger_helper import setup_logger logger = setup_logger(Path(__file__).parent.parent / "logs") def calculate_stream_bitrate(input_file: Path, stream_index: int) -> int: """ Extract audio stream to temporary file using -c copy, capture bitrate from ffmpeg output. Returns bitrate in kbps. Falls back to 0 (and uses metadata) if extraction fails. Uses ffmpeg's reported bitrate which is more accurate than calculating from file size/duration. """ temp_fd, temp_audio_path = tempfile.mkstemp(suffix=".aac", dir=None) os.close(temp_fd) try: # Step 1: Extract audio stream with -c copy (lossless extraction) # ffmpeg outputs bitrate info to stderr extract_cmd = [ "ffmpeg", "-y", "-i", str(input_file), "-map", f"0:a:{stream_index}", "-c", "copy", temp_audio_path ] logger.debug(f"Extracting audio stream {stream_index} to temporary file for bitrate calculation...") result = subprocess.run(extract_cmd, capture_output=True, text=True, check=True) # Step 2: Parse bitrate from ffmpeg's output (stderr) # Look for line like: "bitrate= 457.7kbits/s" bitrate_kbps = 0 for line in result.stderr.split("\n"): if "bitrate=" in line: # Extract bitrate value from line like "size= 352162KiB time=01:45:03.05 bitrate= 457.7kbits/s" parts = line.split("bitrate=") if len(parts) > 1: bitrate_str = parts[1].strip().split("kbits/s")[0].strip() try: bitrate_kbps = int(float(bitrate_str)) logger.debug(f"Stream {stream_index}: Extracted bitrate from ffmpeg output: {bitrate_kbps} kbps") break except ValueError: continue # If we couldn't parse bitrate from output, fall back to calculation if bitrate_kbps == 0: logger.debug(f"Stream {stream_index}: Could not parse bitrate from ffmpeg output, calculating from file size...") file_size_bytes = os.path.getsize(temp_audio_path) # Get duration using ffprobe duration_cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1:noprint_wrappers=1", temp_audio_path ] duration_result = subprocess.run(duration_cmd, capture_output=True, text=True, check=True) duration_seconds = float(duration_result.stdout.strip()) bitrate_kbps = int((file_size_bytes * 8) / duration_seconds / 1000) logger.debug(f"Stream {stream_index}: Calculated bitrate from file: {bitrate_kbps} kbps") return bitrate_kbps except Exception as e: logger.warning(f"Failed to calculate bitrate for stream {stream_index}: {e}. Will fall back to metadata.") return 0 finally: # Clean up temporary audio file try: if os.path.exists(temp_audio_path): os.remove(temp_audio_path) logger.debug(f"Deleted temporary audio file: {temp_audio_path}") except Exception as e: logger.warning(f"Could not delete temporary file {temp_audio_path}: {e}") def get_audio_streams(input_file: Path): """ Detect audio streams and calculate robust bitrates by extracting each stream. Returns list of (index, channels, calculated_bitrate_kbps, language, metadata_bitrate_kbps) """ cmd = [ "ffprobe","-v","error","-select_streams","a", "-show_entries","stream=index,channels,bit_rate,tags=language", "-of","json", str(input_file) ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout) streams = [] for stream_num, s in enumerate(data.get("streams", [])): index = s["index"] channels = s.get("channels", 2) src_lang = s.get("tags", {}).get("language", "und") bit_rate_meta = int(s.get("bit_rate", 0)) if s.get("bit_rate") else 0 # Calculate robust bitrate by extracting the audio stream calculated_bitrate_kbps = calculate_stream_bitrate(input_file, stream_num) # If calculation failed, fall back to metadata if calculated_bitrate_kbps == 0: calculated_bitrate_kbps = int(bit_rate_meta / 1000) if bit_rate_meta else 160 logger.info(f"Stream {index}: Using fallback bitrate {calculated_bitrate_kbps} kbps") streams.append((index, channels, calculated_bitrate_kbps, src_lang, int(bit_rate_meta / 1000) if bit_rate_meta else 0)) return streams def choose_audio_bitrate(channels: int, bitrate_kbps: int, audio_config: dict, is_1080_class: bool) -> tuple: """ Choose audio codec and bitrate based on channel count, detected bitrate, and resolution. Returns tuple: (codec, target_bitrate_bps) - codec: "aac", "libopus", or "copy" (to preserve original without re-encoding) - target_bitrate_bps: target bitrate in bits/sec (0 if using "copy") Rules: Stereo + 1080p: - Above 192k → high (192k) with AAC - At/below 192k → preserve (copy) Stereo + 720p: - Above 160k → medium (160k) with AAC - At/below 160k → preserve (copy) Multi-channel: - Below 384k → low (384k) with AAC - 384k to below medium → low (384k) with AAC - Medium and above → medium with AAC """ # Normalize to 2ch or 6ch output output_channels = 6 if channels >= 6 else 2 if output_channels == 2: # Stereo logic if is_1080_class: # 1080p+ stereo high_br = audio_config["stereo"]["high"] if bitrate_kbps > (high_br / 1000): # Above 192k return ("aac", high_br) else: # Preserve original return ("copy", 0) else: # 720p stereo medium_br = audio_config["stereo"]["medium"] if bitrate_kbps > (medium_br / 1000): # Above 160k return ("aac", medium_br) else: # Preserve original return ("copy", 0) else: # Multi-channel (6ch+) logic low_br = audio_config["multi_channel"]["low"] medium_br = audio_config["multi_channel"]["medium"] if bitrate_kbps < (medium_br / 1000): # Below medium, use low return ("aac", low_br) else: # Medium and above, use medium return ("aac", medium_br)